diff --git a/engine/api/workflow/dao_node_run.go b/engine/api/workflow/dao_node_run.go index 8db9040a31..57bfa3a010 100644 --- a/engine/api/workflow/dao_node_run.go +++ b/engine/api/workflow/dao_node_run.go @@ -194,11 +194,13 @@ func LoadNodeRunByID(db gorp.SqlExecutor, id int64, loadOpts LoadRunOptions) (*s testsField = withLightNodeRunTestsField } - query := fmt.Sprintf(`select %s %s - from workflow_node_run - where workflow_node_run.id = $1`, nodeRunFields, testsField) + query := fmt.Sprintf(` + SELECT %s %s + FROM workflow_node_run + WHERE workflow_node_run.id = $1 + `, nodeRunFields, testsField) if err := db.SelectOne(&rr, query, id); err != nil { - return nil, sdk.WrapError(err, "Unable to load workflow_node_run node=%d", id) + return nil, sdk.WrapError(err, "unable to load workflow_node_run with id %d", id) } r, err := fromDBNodeRun(rr, loadOpts) @@ -207,23 +209,22 @@ func LoadNodeRunByID(db gorp.SqlExecutor, id int64, loadOpts LoadRunOptions) (*s } if loadOpts.WithArtifacts { - arts, errA := loadArtifactByNodeRunID(db, r.ID) - if errA != nil { - return nil, sdk.WrapError(errA, "LoadNodeRunByID>Error loading artifacts for run %d", r.ID) + arts, err := loadArtifactByNodeRunID(db, r.ID) + if err != nil { + return nil, sdk.WrapError(err, "cannot load artifacts for workflow node run %d", r.ID) } r.Artifacts = arts } if loadOpts.WithStaticFiles { - staticFiles, errS := loadStaticFilesByNodeRunID(db, r.ID) - if errS != nil { - return nil, sdk.WrapError(errS, "LoadNodeRunByID>Error loading static files for run %d", r.ID) + staticFiles, err := loadStaticFilesByNodeRunID(db, r.ID) + if err != nil { + return nil, sdk.WrapError(err, "cannot load static files for workflow node run %d", r.ID) } r.StaticFiles = staticFiles } return r, nil - } //insertWorkflowNodeRun insert in table workflow_node_run diff --git a/engine/api/workflow/execute_node_run.go b/engine/api/workflow/execute_node_run.go index 6567207a0f..848b77bf40 100644 --- a/engine/api/workflow/execute_node_run.go +++ b/engine/api/workflow/execute_node_run.go @@ -10,6 +10,7 @@ import ( "github.com/fsamin/go-dump" "github.com/go-gorp/gorp" + "github.com/sirupsen/logrus" "github.com/ovh/cds/engine/api/action" "github.com/ovh/cds/engine/api/cache" @@ -100,51 +101,51 @@ func syncTakeJobInNodeRun(ctx context.Context, db gorp.SqlExecutor, n *sdk.Workf return report, nil } -func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj sdk.Project, nr *sdk.WorkflowNodeRun) (*ProcessorReport, error) { +func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj sdk.Project, workflowNodeRun *sdk.WorkflowNodeRun) (*ProcessorReport, error) { var end func() ctx, end = observability.Span(ctx, "workflow.executeNodeRun", - observability.Tag(observability.TagWorkflowRun, nr.Number), - observability.Tag(observability.TagWorkflowNodeRun, nr.ID), - observability.Tag("workflow_node_run_status", nr.Status), + observability.Tag(observability.TagWorkflowRun, workflowNodeRun.Number), + observability.Tag(observability.TagWorkflowNodeRun, workflowNodeRun.ID), + observability.Tag("workflow_node_run_status", workflowNodeRun.Status), ) defer end() - wr, errWr := LoadRunByID(db, nr.WorkflowRunID, LoadRunOptions{}) - if errWr != nil { - return nil, sdk.WrapError(errWr, "workflow.executeNodeRun> unable to load workflow run ID %d", nr.WorkflowRunID) + + wr, err := LoadRunByID(db, workflowNodeRun.WorkflowRunID, LoadRunOptions{}) + if err != nil { + return nil, sdk.WrapError(err, "unable to load workflow run with id %d", workflowNodeRun.WorkflowRunID) } report := new(ProcessorReport) - defer func(wNr *sdk.WorkflowNodeRun) { - report.Add(ctx, *wNr) - }(nr) + defer func(wnr *sdk.WorkflowNodeRun) { + report.Add(ctx, *wnr) + }(workflowNodeRun) - //If status is not waiting neither build: nothing to do - if sdk.StatusIsTerminated(nr.Status) { + // If status is not waiting neither build: nothing to do + if sdk.StatusIsTerminated(workflowNodeRun.Status) { return nil, nil } - var newStatus = nr.Status + var newStatus = workflowNodeRun.Status - //If no stages ==> success - if len(nr.Stages) == 0 { + // If no stages ==> success + if len(workflowNodeRun.Stages) == 0 { newStatus = sdk.StatusSuccess - nr.Done = time.Now() + workflowNodeRun.Done = time.Now() } stagesTerminated := 0 var previousNodeRun *sdk.WorkflowNodeRun - if nr.Manual != nil && nr.Manual.OnlyFailedJobs { - var err error - previousNodeRun, err = checkRunOnlyFailedJobs(wr, nr) + if workflowNodeRun.Manual != nil && workflowNodeRun.Manual.OnlyFailedJobs { + previousNodeRun, err = checkRunOnlyFailedJobs(wr, workflowNodeRun) if err != nil { return report, err } } - //Browse stages - for stageIndex := range nr.Stages { - stage := &nr.Stages[stageIndex] - //Initialize stage status at waiting + // Browse stages + for stageIndex := range workflowNodeRun.Stages { + stage := &workflowNodeRun.Stages[stageIndex] + // Initialize stage status at waiting if stage.Status == "" { var previousStage sdk.Stage // Find previous stage @@ -164,7 +165,7 @@ func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, } } else if sdk.StatusIsTerminated(previousStage.Status) { // If stage terminated, recopy it - nr.Stages[stageIndex] = previousStage + workflowNodeRun.Stages[stageIndex] = previousStage stagesTerminated++ continue } @@ -172,10 +173,10 @@ func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, if len(stage.Jobs) == 0 { stage.Status = sdk.StatusSuccess } else { - //Add job to Queue - //Insert data in workflow_node_run_job + // Add job to Queue + // Insert data in workflow_node_run_job log.Debug("workflow.executeNodeRun> stage %s call addJobsToQueue", stage.Name) - r, err := addJobsToQueue(ctx, db, stage, wr, nr, &previousStage) + r, err := addJobsToQueue(ctx, db, stage, wr, workflowNodeRun, &previousStage) report.Merge(ctx, r) if err != nil { return report, err @@ -183,7 +184,7 @@ func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, log.Debug("workflow.executeNodeRun> stage %s status after call to addJobsToQueue %s", stage.Name, stage.Status) } - // check for failure caused by action not usable or requirements problem + // Check for failure caused by action not usable or requirements problem if sdk.StatusFail == stage.Status { newStatus = sdk.StatusFail break @@ -227,13 +228,13 @@ func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, } else { //The stage is over if stage.Status == sdk.StatusFail { - nr.Done = time.Now() + workflowNodeRun.Done = time.Now() newStatus = sdk.StatusFail stagesTerminated++ break } if stage.Status == sdk.StatusStopped { - nr.Done = time.Now() + workflowNodeRun.Done = time.Now() newStatus = sdk.StatusStopped stagesTerminated++ break @@ -241,53 +242,54 @@ func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, if sdk.StatusIsTerminated(stage.Status) { stagesTerminated++ - nr.Done = time.Now() + workflowNodeRun.Done = time.Now() } - if stageIndex == len(nr.Stages)-1 { - nr.Done = time.Now() + if stageIndex == len(workflowNodeRun.Stages)-1 { + workflowNodeRun.Done = time.Now() newStatus = sdk.StatusSuccess stagesTerminated++ break } - if stageIndex != len(nr.Stages)-1 { + if stageIndex != len(workflowNodeRun.Stages)-1 { continue } } } } - if stagesTerminated >= len(nr.Stages) || (stagesTerminated >= len(nr.Stages)-1 && (nr.Stages[len(nr.Stages)-1].Status == sdk.StatusDisabled || nr.Stages[len(nr.Stages)-1].Status == sdk.StatusSkipped)) { + if stagesTerminated >= len(workflowNodeRun.Stages) || (stagesTerminated >= len(workflowNodeRun.Stages)-1 && + (workflowNodeRun.Stages[len(workflowNodeRun.Stages)-1].Status == sdk.StatusDisabled || workflowNodeRun.Stages[len(workflowNodeRun.Stages)-1].Status == sdk.StatusSkipped)) { var counterStatus statusCounter - if len(nr.Stages) > 0 { - for _, stage := range nr.Stages { + if len(workflowNodeRun.Stages) > 0 { + for _, stage := range workflowNodeRun.Stages { computeRunStatus(stage.Status, &counterStatus) } newStatus = getRunStatus(counterStatus) } } - nr.Status = newStatus + workflowNodeRun.Status = newStatus - if sdk.StatusIsTerminated(nr.Status) && nr.Status != sdk.StatusNeverBuilt { - nr.Done = time.Now() + if sdk.StatusIsTerminated(workflowNodeRun.Status) && workflowNodeRun.Status != sdk.StatusNeverBuilt { + workflowNodeRun.Done = time.Now() } // Save the node run in database - if err := updateNodeRunStatusAndStage(db, nr); err != nil { - return nil, sdk.WrapError(err, "unable to update node id=%d at status %s", nr.ID, nr.Status) + if err := updateNodeRunStatusAndStage(db, workflowNodeRun); err != nil { + return nil, sdk.WrapError(err, "unable to update node id=%d at status %s", workflowNodeRun.ID, workflowNodeRun.Status) } //Reload the workflow - updatedWorkflowRun, err := LoadRunByID(db, nr.WorkflowRunID, LoadRunOptions{}) + updatedWorkflowRun, err := LoadRunByID(db, workflowNodeRun.WorkflowRunID, LoadRunOptions{}) if err != nil { - return nil, sdk.WrapError(err, "unable to reload workflow run id=%d", nr.WorkflowRunID) + return nil, sdk.WrapError(err, "unable to reload workflow run id=%d", workflowNodeRun.WorkflowRunID) } // If pipeline build succeed, reprocess the workflow (in the same transaction) - //Delete jobs only when node is over - if sdk.StatusIsTerminated(nr.Status) { - if nr.Status != sdk.StatusStopped { + // Delete jobs only when node is over + if sdk.StatusIsTerminated(workflowNodeRun.Status) { + if workflowNodeRun.Status != sdk.StatusStopped { r1, _, err := processWorkflowDataRun(ctx, db, store, proj, updatedWorkflowRun, nil, nil, nil) if err != nil { return nil, sdk.WrapError(err, "unable to reprocess workflow") @@ -295,82 +297,94 @@ func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, report.Merge(ctx, r1) } - //Delete the line in workflow_node_run_job - if err := DeleteNodeJobRuns(db, nr.ID); err != nil { - return nil, sdk.WrapError(err, "unable to delete node %d job runs", nr.ID) - } - - var hasMutex bool - var nodeName string - - node := updatedWorkflowRun.Workflow.WorkflowData.NodeByID(nr.WorkflowNodeID) - if node != nil && node.Context != nil && node.Context.Mutex { - hasMutex = node.Context.Mutex - nodeName = node.Name + // Delete the line in workflow_node_run_job + if err := DeleteNodeJobRuns(db, workflowNodeRun.ID); err != nil { + return nil, sdk.WrapError(err, "unable to delete node %d job runs", workflowNodeRun.ID) } - //Do we release a mutex ? - //Try to find one node run of the same node from the same workflow at status Waiting + // If current node has a mutex, we want to trigger another node run that can be waiting for the mutex + node := updatedWorkflowRun.Workflow.WorkflowData.NodeByID(workflowNodeRun.WorkflowNodeID) + hasMutex := node != nil && node.Context != nil && node.Context.Mutex if hasMutex { - _, next := observability.Span(ctx, "workflow.releaseMutex") - - mutexQuery := `select workflow_node_run.id - from workflow_node_run - join workflow_run on workflow_run.id = workflow_node_run.workflow_run_id - join workflow on workflow.id = workflow_run.workflow_id - where workflow.id = $1 - and workflow_node_run.workflow_node_name = $2 - and workflow_node_run.status = $3 - order by workflow_node_run.start asc - limit 1` - waitingRunID, errID := db.SelectInt(mutexQuery, updatedWorkflowRun.WorkflowID, nodeName, string(sdk.StatusWaiting)) - if errID != nil && errID != sql.ErrNoRows { - log.Error(ctx, "workflow.execute> Unable to load mutex-locked workflow node run ID: %v", errID) - return report, nil - } - //If not more run is found, stop the loop - if waitingRunID == 0 { - return report, nil - } - waitingRun, errRun := LoadNodeRunByID(db, waitingRunID, LoadRunOptions{}) - if errRun != nil && sdk.Cause(errRun) != sql.ErrNoRows { - log.Error(ctx, "workflow.execute> Unable to load mutex-locked workflow rnode un: %v", errRun) - return report, nil - } - //If not more run is found, stop the loop - if waitingRun == nil { - return report, nil - } - - //Here we are loading another workflow run - workflowRun, errWRun := LoadRunByID(db, waitingRun.WorkflowRunID, LoadRunOptions{}) - if errWRun != nil { - log.Error(ctx, "workflow.execute> Unable to load mutex-locked workflow rnode un: %v", errWRun) - return report, nil - } - AddWorkflowRunInfo(workflowRun, sdk.SpawnMsg{ - ID: sdk.MsgWorkflowNodeMutexRelease.ID, - Args: []interface{}{waitingRun.WorkflowNodeName}, - Type: sdk.MsgWorkflowNodeMutexRelease.Type, - }) - - if err := UpdateWorkflowRun(ctx, db, workflowRun); err != nil { - return nil, sdk.WrapError(err, "unable to update workflow run %d after mutex release", workflowRun.ID) - } - - log.Debug("workflow.execute> process the node run %d because mutex has been released", waitingRun.ID) - r, err := executeNodeRun(ctx, db, store, proj, waitingRun) + r, err := releaseMutex(ctx, db, store, proj, updatedWorkflowRun.WorkflowID, workflowNodeRun.WorkflowNodeName) report.Merge(ctx, r) if err != nil { - return nil, sdk.WrapError(err, "unable to reprocess workflow") + return report, err } - - next() } } return report, nil } +func releaseMutex(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj sdk.Project, workflowID int64, nodeName string) (*ProcessorReport, error) { + _, next := observability.Span(ctx, "workflow.releaseMutex") + defer next() + + mutexQuery := ` + SELECT workflow_node_run.id + FROM workflow_node_run + JOIN workflow_run on workflow_run.id = workflow_node_run.workflow_run_id + JOIN workflow on workflow.id = workflow_run.workflow_id + WHERE workflow.id = $1 + AND workflow_node_run.workflow_node_name = $2 + AND workflow_node_run.status = $3 + ORDER BY workflow_run.num ASC + LIMIT 1 + ` + waitingRunID, err := db.SelectInt(mutexQuery, workflowID, nodeName, string(sdk.StatusWaiting)) + if err != nil && err != sql.ErrNoRows { + err = sdk.WrapError(err, "unable to load mutex-locked workflow node run id") + log.ErrorWithFields(ctx, logrus.Fields{ + "stack_trace": fmt.Sprintf("%+v", err), + }, "%s", err) + return nil, nil + } + if waitingRunID == 0 { + return nil, nil + } + + // Load the workflow node run that is waiting for the mutex + waitingRun, errRun := LoadNodeRunByID(db, waitingRunID, LoadRunOptions{}) + if errRun != nil && sdk.Cause(errRun) != sql.ErrNoRows { + err = sdk.WrapError(err, "unable to load mutex-locked workflow node run") + log.ErrorWithFields(ctx, logrus.Fields{ + "stack_trace": fmt.Sprintf("%+v", err), + }, "%s", err) + return nil, nil + } + if waitingRun == nil { + return nil, nil + } + + // Load the workflow run that is waiting for the mutex + workflowRun, err := LoadRunByID(db, waitingRun.WorkflowRunID, LoadRunOptions{}) + if err != nil { + err = sdk.WrapError(err, "unable to load mutex-locked workflow run") + log.ErrorWithFields(ctx, logrus.Fields{ + "stack_trace": fmt.Sprintf("%+v", err), + }, "%s", err) + return nil, nil + } + + // Add a spawn info on the workflow run + AddWorkflowRunInfo(workflowRun, sdk.SpawnMsg{ + ID: sdk.MsgWorkflowNodeMutexRelease.ID, + Args: []interface{}{waitingRun.WorkflowNodeName}, + Type: sdk.MsgWorkflowNodeMutexRelease.Type, + }) + if err := UpdateWorkflowRun(ctx, db, workflowRun); err != nil { + return nil, sdk.WrapError(err, "unable to update workflow run %d after mutex release", workflowRun.ID) + } + + log.Debug("workflow.execute> process the node run %d because mutex has been released", waitingRun.ID) + r, err := executeNodeRun(ctx, db, store, proj, waitingRun) + if err != nil { + return r, sdk.WrapError(err, "unable to reprocess workflow") + } + + return r, nil +} + func checkRunOnlyFailedJobs(wr *sdk.WorkflowRun, nr *sdk.WorkflowNodeRun) (*sdk.WorkflowNodeRun, error) { var previousNR *sdk.WorkflowNodeRun nrs, ok := wr.WorkflowNodeRuns[nr.WorkflowNodeID] @@ -853,27 +867,38 @@ func stopWorkflowNodeOutGoingHook(ctx context.Context, dbFunc func() *gorp.DbMap } // StopWorkflowNodeRun to stop a workflow node run with a specific spawn info -func StopWorkflowNodeRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, proj sdk.Project, nodeRun sdk.WorkflowNodeRun, stopInfos sdk.SpawnInfo) (*ProcessorReport, error) { +func StopWorkflowNodeRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, proj sdk.Project, workflowRun sdk.WorkflowRun, workflowNodeRun sdk.WorkflowNodeRun, stopInfos sdk.SpawnInfo) (*ProcessorReport, error) { var end func() ctx, end = observability.Span(ctx, "workflow.StopWorkflowNodeRun") defer end() report := new(ProcessorReport) - var r1 *ProcessorReport - var errS error - if nodeRun.Stages != nil && len(nodeRun.Stages) > 0 { - r1, errS = stopWorkflowNodePipeline(ctx, dbFunc, store, proj, &nodeRun, stopInfos) + var r *ProcessorReport + var err error + if workflowNodeRun.Stages != nil && len(workflowNodeRun.Stages) > 0 { + r, err = stopWorkflowNodePipeline(ctx, dbFunc, store, proj, &workflowNodeRun, stopInfos) } - if nodeRun.OutgoingHook != nil { - errS = stopWorkflowNodeOutGoingHook(ctx, dbFunc, &nodeRun) + if workflowNodeRun.OutgoingHook != nil { + err = stopWorkflowNodeOutGoingHook(ctx, dbFunc, &workflowNodeRun) } - if errS != nil { - return report, sdk.WrapError(errS, "unable to stop workflow node run") + if err != nil { + return report, sdk.WrapError(err, "unable to stop workflow node run") } - report.Merge(ctx, r1) - report.Add(ctx, nodeRun) + report.Merge(ctx, r) + report.Add(ctx, workflowNodeRun) + + // If current node has a mutex, we want to trigger another node run that can be waiting for the mutex + workflowNode := workflowRun.Workflow.WorkflowData.NodeByID(workflowNodeRun.WorkflowNodeID) + hasMutex := workflowNode != nil && workflowNode.Context != nil && workflowNode.Context.Mutex + if hasMutex { + r, err = releaseMutex(ctx, dbFunc(), store, proj, workflowNodeRun.WorkflowID, workflowNodeRun.WorkflowNodeName) + report.Merge(ctx, r) + if err != nil { + return report, err + } + } return report, nil } diff --git a/engine/api/workflow_run.go b/engine/api/workflow_run.go index d7b611f371..5ec8404ee4 100644 --- a/engine/api/workflow_run.go +++ b/engine/api/workflow_run.go @@ -355,26 +355,25 @@ func (api *API) stopWorkflowRunHandler() service.Handler { return sdk.WrapError(err, "unable to load last workflow run") } - proj, errP := project.Load(api.mustDB(), key) - if errP != nil { - return sdk.WrapError(errP, "stopWorkflowRunHandler> Unable to load project") + proj, err := project.Load(api.mustDB(), key) + if err != nil { + return sdk.WrapError(err, "unable to load project") } report, err := stopWorkflowRun(ctx, api.mustDB, api.Cache, proj, run, getAPIConsumer(ctx), 0) if err != nil { - return sdk.WrapError(err, "Unable to stop workflow") + return sdk.WrapError(err, "unable to stop workflow") } - workflowRuns := report.WorkflowRuns() go WorkflowSendEvent(context.Background(), api.mustDB(), api.Cache, *proj, report) go func(ID int64) { - wRun, errLw := workflow.LoadRunByID(api.mustDB(), ID, workflow.LoadRunOptions{DisableDetailledNodeRun: true}) - if errLw != nil { - log.Error(ctx, "workflow.stopWorkflowNodeRun> Cannot load run for resync commit status %v", errLw) + wRun, err := workflow.LoadRunByID(api.mustDB(), ID, workflow.LoadRunOptions{DisableDetailledNodeRun: true}) + if err != nil { + log.Error(ctx, "workflow.stopWorkflowNodeRun> Cannot load run for resync commit status %v", err) return } - //The function could be called with nil project so we need to test if project is not nil + // The function could be called with nil project so we need to test if project is not nil if sdk.StatusIsTerminated(wRun.Status) && proj != nil { wRun.LastExecution = time.Now() if err := workflow.ResyncCommitStatus(context.Background(), api.mustDB(), api.Cache, *proj, wRun); err != nil { @@ -383,12 +382,12 @@ func (api *API) stopWorkflowRunHandler() service.Handler { } }(run.ID) + workflowRuns := report.WorkflowRuns() if len(workflowRuns) > 0 { observability.Current(ctx, observability.Tag(observability.TagProjectKey, proj.Key), observability.Tag(observability.TagWorkflow, workflowRuns[0].Workflow.Name), ) - if workflowRuns[0].Status == sdk.StatusFail { observability.Record(api.Router.Background, api.Metrics.WorkflowRunFailed, 1) } @@ -426,7 +425,7 @@ func stopWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache continue } - r1, err := workflow.StopWorkflowNodeRun(ctx, dbFunc, store, *p, wnr, stopInfos) + r1, err := workflow.StopWorkflowNodeRun(ctx, dbFunc, store, *p, *run, wnr, stopInfos) if err != nil { return nil, sdk.WrapError(err, "unable to stop workflow node run %d", wnr.ID) } @@ -657,96 +656,87 @@ func (api *API) stopWorkflowNodeRunHandler() service.Handler { return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error { vars := mux.Vars(r) key := vars["key"] - name := vars["permWorkflowName"] - number, err := requestVarInt(r, "number") + workflowName := vars["permWorkflowName"] + workflowRunNumber, err := requestVarInt(r, "number") if err != nil { return err } - id, err := requestVarInt(r, "nodeRunID") + workflowNodeRunID, err := requestVarInt(r, "nodeRunID") if err != nil { return err } - p, errP := project.Load(api.mustDB(), key, project.LoadOptions.WithVariables) - if errP != nil { - return sdk.WrapError(errP, "stopWorkflowNodeRunHandler> Cannot load project") + p, err := project.Load(api.mustDB(), key, project.LoadOptions.WithVariables) + if err != nil { + return sdk.WrapError(err, "cannot load project") } - // Load node run - nodeRun, err := workflow.LoadNodeRun(api.mustDB(), key, name, number, id, workflow.LoadRunOptions{}) + workflowRun, err := workflow.LoadRun(ctx, api.mustDB(), p.Key, workflowName, workflowRunNumber, workflow.LoadRunOptions{ + WithDeleted: true, + }) if err != nil { - return sdk.WrapError(err, "Unable to load last workflow run") + return sdk.WrapError(err, "unable to load workflow run with number %d for workflow %s", workflowRunNumber, workflowName) } - report, err := api.stopWorkflowNodeRun(ctx, api.mustDB, api.Cache, p, nodeRun, name, getAPIConsumer(ctx)) + workflowNodeRun, err := workflow.LoadNodeRun(api.mustDB(), key, workflowName, workflowRun.Number, workflowNodeRunID, workflow.LoadRunOptions{ + WithDeleted: true, + }) if err != nil { - return sdk.WrapError(err, "Unable to stop workflow run") + return sdk.WrapError(err, "unable to load workflow node run with id %d for workflow %s and run with number %d", workflowNodeRunID, workflowName, workflowRun.Number) } - go WorkflowSendEvent(context.Background(), api.mustDB(), api.Cache, *p, report) - - return service.WriteJSON(w, nodeRun, http.StatusOK) - } -} + report, err := workflow.StopWorkflowNodeRun(ctx, api.mustDB, api.Cache, *p, *workflowRun, *workflowNodeRun, sdk.SpawnInfo{ + APITime: time.Now(), + RemoteTime: time.Now(), + Message: sdk.SpawnMsg{ID: sdk.MsgWorkflowNodeStop.ID, Args: []interface{}{getAPIConsumer(ctx).GetUsername()}}, + }) + if err != nil { + return sdk.WrapError(err, "unable to stop workflow node run") + } -func (api *API) stopWorkflowNodeRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, - p *sdk.Project, nodeRun *sdk.WorkflowNodeRun, workflowName string, ident sdk.Identifiable) (*workflow.ProcessorReport, error) { - tx, errTx := dbFunc().Begin() - if errTx != nil { - return nil, sdk.WrapError(errTx, "unable to create transaction") - } - defer tx.Rollback() // nolint + tx, err := api.mustDB().Begin() + if err != nil { + return sdk.WithStack(err) + } + defer tx.Rollback() // nolint - stopInfos := sdk.SpawnInfo{ - APITime: time.Now(), - RemoteTime: time.Now(), - Message: sdk.SpawnMsg{ID: sdk.MsgWorkflowNodeStop.ID, Args: []interface{}{ident.GetUsername()}}, - } - report, err := workflow.StopWorkflowNodeRun(ctx, dbFunc, store, *p, *nodeRun, stopInfos) - if err != nil { - return nil, sdk.WrapError(err, "unable to stop workflow node run") - } + r1, err := workflow.ResyncWorkflowRunStatus(ctx, tx, workflowRun) + report.Merge(ctx, r1) + if err != nil { + return sdk.WrapError(err, "unable to resync workflow run status") + } - wr, errLw := workflow.LoadRun(ctx, tx, p.Key, workflowName, nodeRun.Number, workflow.LoadRunOptions{}) - if errLw != nil { - return nil, sdk.WrapError(errLw, "unable to load workflow run %s", workflowName) - } + observability.Current(ctx, + observability.Tag(observability.TagProjectKey, p.Key), + observability.Tag(observability.TagWorkflow, workflowRun.Workflow.Name), + ) + if workflowRun.Status == sdk.StatusFail { + observability.Record(api.Router.Background, api.Metrics.WorkflowRunFailed, 1) + } - r1, errR := workflow.ResyncWorkflowRunStatus(ctx, tx, wr) - if errR != nil { - return nil, sdk.WrapError(errR, "unable to resync workflow run status") - } + if err := tx.Commit(); err != nil { + return sdk.WithStack(err) + } - report.Merge(ctx, r1) + go func(ID int64) { + wRun, err := workflow.LoadRunByID(api.mustDB(), ID, workflow.LoadRunOptions{DisableDetailledNodeRun: true}) + if err != nil { + log.Error(ctx, "workflow.stopWorkflowNodeRun> Cannot load run for resync commit status %v", err) + return + } + //The function could be called with nil project so we need to test if project is not nil + if sdk.StatusIsTerminated(wRun.Status) && p != nil { + wRun.LastExecution = time.Now() + if err := workflow.ResyncCommitStatus(context.Background(), api.mustDB(), api.Cache, *p, wRun); err != nil { + log.Error(ctx, "workflow.stopWorkflowNodeRun> %v", err) + } + } + }(workflowRun.ID) - observability.Current(ctx, - observability.Tag(observability.TagProjectKey, p.Key), - observability.Tag(observability.TagWorkflow, wr.Workflow.Name), - ) - if wr.Status == sdk.StatusFail { - observability.Record(api.Router.Background, api.Metrics.WorkflowRunFailed, 1) - } + go WorkflowSendEvent(context.Background(), api.mustDB(), api.Cache, *p, report) - if errC := tx.Commit(); errC != nil { - return nil, sdk.WrapError(errC, "unable to commit") + return service.WriteJSON(w, workflowNodeRun, http.StatusOK) } - - go func(ID int64) { - wRun, errLw := workflow.LoadRunByID(api.mustDB(), ID, workflow.LoadRunOptions{DisableDetailledNodeRun: true}) - if errLw != nil { - log.Error(ctx, "workflow.stopWorkflowNodeRun> Cannot load run for resync commit status %v", errLw) - return - } - //The function could be called with nil project so we need to test if project is not nil - if sdk.StatusIsTerminated(wRun.Status) && p != nil { - wRun.LastExecution = time.Now() - if err := workflow.ResyncCommitStatus(context.Background(), api.mustDB(), api.Cache, *p, wRun); err != nil { - log.Error(ctx, "workflow.stopWorkflowNodeRun> %v", err) - } - } - }(wr.ID) - - return report, nil } func (api *API) getWorkflowNodeRunHandler() service.Handler { diff --git a/engine/api/workflow_run_test.go b/engine/api/workflow_run_test.go index 757c449740..d9de35fbab 100644 --- a/engine/api/workflow_run_test.go +++ b/engine/api/workflow_run_test.go @@ -1662,6 +1662,283 @@ func Test_postWorkflowRunHandlerHookWithMutex(t *testing.T) { assert.Equal(t, sdk.StatusBuilding, lastRun.Status) } +func Test_postWorkflowRunHandlerMutexRelease(t *testing.T) { + api, db, router, end := newTestAPI(t) + defer end() + + u, jwt := assets.InsertAdminUser(t, api.mustDB()) + + // Init test pipeline with one stage and one job + projKey := sdk.RandomString(10) + proj := assets.InsertTestProject(t, db, api.Cache, projKey, projKey) + pip := sdk.Pipeline{ProjectID: proj.ID, ProjectKey: proj.Key, Name: sdk.RandomString(10)} + require.NoError(t, pipeline.InsertPipeline(api.mustDB(), &pip)) + stage := sdk.Stage{PipelineID: pip.ID, Name: sdk.RandomString(10), Enabled: true} + require.NoError(t, pipeline.InsertStage(api.mustDB(), &stage)) + job := &sdk.Job{Enabled: true, Action: sdk.Action{Enabled: true}} + require.NoError(t, pipeline.InsertJob(api.mustDB(), job, stage.ID, &pip)) + + // Init test workflow with one pipeline with mutex + wkf := sdk.Workflow{ + ProjectID: proj.ID, + ProjectKey: proj.Key, + Name: sdk.RandomString(10), + WorkflowData: sdk.WorkflowData{ + Node: sdk.Node{ + Name: "root", + Type: sdk.NodeTypePipeline, + Context: &sdk.NodeContext{ + PipelineID: pip.ID, + Mutex: true, + }, + }, + }, + } + require.NoError(t, workflow.Insert(context.TODO(), api.mustDB(), api.Cache, *proj, &wkf)) + + // Run workflow 1 + uri := router.GetRoute("POST", api.postWorkflowRunHandler, map[string]string{ + "key": proj.Key, + "permWorkflowName": wkf.Name, + }) + require.NotEmpty(t, uri) + req := assets.NewAuthentifiedRequest(t, u, jwt, "POST", uri, sdk.WorkflowRunPostHandlerOption{}) + rec := httptest.NewRecorder() + router.Mux.ServeHTTP(rec, req) + require.Equal(t, 202, rec.Code) + + var try int + for { + if try > 10 { + t.Logf("Maximum attempts reached on getWorkflowRunHandler for run 1") + t.FailNow() + return + } + try++ + t.Logf("Attempt #%d on getWorkflowRunHandler for run 1", try) + uri := router.GetRoute("GET", api.getWorkflowRunHandler, map[string]string{ + "key": proj.Key, + "permWorkflowName": wkf.Name, + "number": "1", + }) + req := assets.NewAuthentifiedRequest(t, u, jwt, "GET", uri, nil) + rec := httptest.NewRecorder() + router.Mux.ServeHTTP(rec, req) + require.Equal(t, 200, rec.Code) + + var wkfRun sdk.WorkflowRun + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &wkfRun)) + if wkfRun.Status != sdk.StatusBuilding { + t.Logf("Workflow run status: %s", wkfRun.Status) + continue + } + + require.Equal(t, sdk.StatusBuilding, wkfRun.Status) + require.Equal(t, sdk.StatusWaiting, wkfRun.RootRun().Stages[0].Status) + break + } + + // Run workflow 2 + uri = router.GetRoute("POST", api.postWorkflowRunHandler, map[string]string{ + "key": proj.Key, + "permWorkflowName": wkf.Name, + }) + require.NotEmpty(t, uri) + req = assets.NewAuthentifiedRequest(t, u, jwt, "POST", uri, sdk.WorkflowRunPostHandlerOption{}) + rec = httptest.NewRecorder() + router.Mux.ServeHTTP(rec, req) + require.Equal(t, 202, rec.Code) + + try = 0 + for { + if try > 10 { + t.Logf("Maximum attempts reached on getWorkflowRunHandler for run 2") + t.FailNow() + return + } + try++ + t.Logf("Attempt #%d on getWorkflowRunHandler for run 2", try) + uri := router.GetRoute("GET", api.getWorkflowRunHandler, map[string]string{ + "key": proj.Key, + "permWorkflowName": wkf.Name, + "number": "2", + }) + req := assets.NewAuthentifiedRequest(t, u, jwt, "GET", uri, nil) + rec := httptest.NewRecorder() + router.Mux.ServeHTTP(rec, req) + require.Equal(t, 200, rec.Code) + + var wkfRun sdk.WorkflowRun + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &wkfRun)) + if wkfRun.Status != sdk.StatusBuilding { + t.Logf("Workflow run status: %s", wkfRun.Status) + continue + } + + require.Equal(t, sdk.StatusBuilding, wkfRun.Status) + require.Equal(t, 2, len(wkfRun.Infos)) + require.Equal(t, sdk.MsgWorkflowStarting.ID, wkfRun.Infos[0].Message.ID) + require.Equal(t, sdk.MsgWorkflowNodeMutex.ID, wkfRun.Infos[1].Message.ID) + require.Equal(t, "", wkfRun.RootRun().Stages[0].Status) + break + } + + // Run workflow 3 + uri = router.GetRoute("POST", api.postWorkflowRunHandler, map[string]string{ + "key": proj.Key, + "permWorkflowName": wkf.Name, + }) + require.NotEmpty(t, uri) + req = assets.NewAuthentifiedRequest(t, u, jwt, "POST", uri, sdk.WorkflowRunPostHandlerOption{}) + rec = httptest.NewRecorder() + router.Mux.ServeHTTP(rec, req) + require.Equal(t, 202, rec.Code) + + try = 0 + for { + if try > 10 { + t.Logf("Maximum attempts reached on getWorkflowRunHandler for run 3") + t.FailNow() + return + } + try++ + t.Logf("Attempt #%d on getWorkflowRunHandler for run 3", try) + uri := router.GetRoute("GET", api.getWorkflowRunHandler, map[string]string{ + "key": proj.Key, + "permWorkflowName": wkf.Name, + "number": "3", + }) + req := assets.NewAuthentifiedRequest(t, u, jwt, "GET", uri, nil) + rec := httptest.NewRecorder() + router.Mux.ServeHTTP(rec, req) + require.Equal(t, 200, rec.Code) + + var wkfRun sdk.WorkflowRun + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &wkfRun)) + if wkfRun.Status != sdk.StatusBuilding { + t.Logf("Workflow run status: %s", wkfRun.Status) + continue + } + + require.Equal(t, sdk.StatusBuilding, wkfRun.Status) + require.Equal(t, 2, len(wkfRun.Infos)) + require.Equal(t, sdk.MsgWorkflowStarting.ID, wkfRun.Infos[0].Message.ID) + require.Equal(t, sdk.MsgWorkflowNodeMutex.ID, wkfRun.Infos[1].Message.ID) + require.Equal(t, "", wkfRun.RootRun().Stages[0].Status) + break + } + + // Stop workflow 1 + uri = router.GetRoute("POST", api.stopWorkflowRunHandler, map[string]string{ + "key": proj.Key, + "permWorkflowName": wkf.Name, + "number": "1", + }) + require.NotEmpty(t, uri) + req = assets.NewAuthentifiedRequest(t, u, jwt, "POST", uri, nil) + rec = httptest.NewRecorder() + router.Mux.ServeHTTP(rec, req) + require.Equal(t, 200, rec.Code) + + try = 0 + for { + if try > 10 { + t.Logf("Maximum attempts reached on getWorkflowRunHandler for run 1") + t.FailNow() + return + } + try++ + t.Logf("Attempt #%d on getWorkflowRunHandler for run 1", try) + uri := router.GetRoute("GET", api.getWorkflowRunHandler, map[string]string{ + "key": proj.Key, + "permWorkflowName": wkf.Name, + "number": "1", + }) + req := assets.NewAuthentifiedRequest(t, u, jwt, "GET", uri, nil) + rec := httptest.NewRecorder() + router.Mux.ServeHTTP(rec, req) + require.Equal(t, 200, rec.Code) + + var wkfRun sdk.WorkflowRun + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &wkfRun)) + if wkfRun.Status != sdk.StatusStopped { + t.Logf("Workflow run status: %s", wkfRun.Status) + continue + } + + require.Equal(t, sdk.StatusStopped, wkfRun.Status) + require.Equal(t, sdk.StatusStopped, wkfRun.RootRun().Stages[0].Status) + break + } + + // Run 2 should be running + try = 0 + for { + if try > 10 { + t.Logf("Maximum attempts reached on getWorkflowRunHandler for run 2") + t.FailNow() + return + } + try++ + t.Logf("Attempt #%d on getWorkflowRunHandler for run 2", try) + uri := router.GetRoute("GET", api.getWorkflowRunHandler, map[string]string{ + "key": proj.Key, + "permWorkflowName": wkf.Name, + "number": "2", + }) + req := assets.NewAuthentifiedRequest(t, u, jwt, "GET", uri, nil) + rec := httptest.NewRecorder() + router.Mux.ServeHTTP(rec, req) + require.Equal(t, 200, rec.Code) + + var wkfRun sdk.WorkflowRun + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &wkfRun)) + if wkfRun.Status != sdk.StatusBuilding && wkfRun.RootRun().Stages[0].Status == sdk.StatusWaiting { + t.Logf("Workflow run status: %s", wkfRun.Status) + continue + } + + require.Equal(t, sdk.StatusBuilding, wkfRun.Status) + require.Equal(t, sdk.StatusWaiting, wkfRun.RootRun().Stages[0].Status, "Stop a previous workflow run should have release the mutex and trigger the second run, status of the stage should change for empty string to waiting") + break + } + + // Run 3 should still be locked + try = 0 + for { + if try > 10 { + t.Logf("Maximum attempts reached on getWorkflowRunHandler for run 3") + t.FailNow() + return + } + try++ + t.Logf("Attempt #%d on getWorkflowRunHandler for run 3", try) + uri := router.GetRoute("GET", api.getWorkflowRunHandler, map[string]string{ + "key": proj.Key, + "permWorkflowName": wkf.Name, + "number": "3", + }) + req := assets.NewAuthentifiedRequest(t, u, jwt, "GET", uri, nil) + rec := httptest.NewRecorder() + router.Mux.ServeHTTP(rec, req) + require.Equal(t, 200, rec.Code) + + var wkfRun sdk.WorkflowRun + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &wkfRun)) + if wkfRun.Status != sdk.StatusBuilding { + t.Logf("Workflow run status: %s", wkfRun.Status) + continue + } + + require.Equal(t, sdk.StatusBuilding, wkfRun.Status) + require.Equal(t, 2, len(wkfRun.Infos)) + require.Equal(t, sdk.MsgWorkflowStarting.ID, wkfRun.Infos[0].Message.ID) + require.Equal(t, sdk.MsgWorkflowNodeMutex.ID, wkfRun.Infos[1].Message.ID) + require.Equal(t, "", wkfRun.RootRun().Stages[0].Status) + break + } +} + func Test_postWorkflowRunHandlerHook(t *testing.T) { api, db, router := newTestAPI(t)