diff --git a/shared.go b/shared.go index 7f0e819..60dc894 100644 --- a/shared.go +++ b/shared.go @@ -5197,6 +5197,130 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut ExecutionVariable: actionResult.Action.ExecutionVariable, } + // Fills in data from subflows, whether they're loops or not + if actionResult.Status == "SUCCESS" && actionResult.Action.AppName == "shuffle-subflow" { + runCheck := false + for _, param := range actionResult.Action.Parameters { + if param.Name == "check_result" { + log.Printf("RESULT: %#v", param) + if param.Value == "true" { + runCheck = true + } + + break + } + } + + //log.Printf("\n\nRUNCHECK: %#v\n\n", runCheck) + if runCheck { + log.Printf("[INFO] Validating subflow result in workflow %s", workflowExecution.ExecutionId) + type SubflowData struct { + Success bool `json:"success"` + ExecutionId string `json:"execution_id"` + Authorization string `json:"authorization"` + Result string `json:"result"` + } + + // WAY lower timeout in cloud + // Should probably change it for enterprise customers? + // Idk how to handle this in cloud yet. + subflowTimeout := 180 + if project.Environment == "cloud" { + subflowTimeout = 15 + } + + subflowResult := SubflowData{} + subflowResults := []SubflowData{} + err = json.Unmarshal([]byte(actionResult.Result), &subflowResult) + if err != nil { + subflowResults = []SubflowData{} + err = json.Unmarshal([]byte(actionResult.Result), &subflowResults) + if err == nil { + log.Printf("Should get data for %d subflow executions", len(subflowResults)) + count := 0 + updated := false + for { + time.Sleep(3 * time.Second) + + finished := 0 + for subflowIndex, subflowResult := range subflowResults { + if !subflowResult.Success || len(subflowResult.Result) != 0 { + finished += 1 + continue + } + + workflowExecution, err := GetWorkflowExecution(ctx, subflowResult.ExecutionId) + if err != nil { + log.Printf("[WARNING] Error getting subflow data: %s", err) + } else { + //log.Printf("Results: %d, status: %s", len(workflowExecution.Results), workflowExecution.Status) + if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "ABORTED" { + subflowResults[subflowIndex].Result = workflowExecution.Result + updated = true + finished += 1 + } + } + } + + if finished == len(subflowResults) { + break + } + + if count >= subflowTimeout/3 { + break + } + + count += 1 + } + + if updated { + newJson, err := json.Marshal(subflowResults) + if err == nil { + actionResult.Result = string(newJson) + } else { + log.Printf("[WARNING] Failed marshalling subflowresultS: %s", err) + } + } + } + } + + if err == nil && subflowResult.Success == true && len(subflowResult.ExecutionId) > 0 { + log.Printf("Should get data for subflow execution %s", subflowResult.ExecutionId) + count := 0 + for { + time.Sleep(3 * time.Second) + workflowExecution, err := GetWorkflowExecution(ctx, subflowResult.ExecutionId) + if err != nil { + log.Printf("[WARNING] Error getting subflow data: %s", err) + } else { + //log.Printf("Results: %d, status: %s", len(workflowExecution.Results), workflowExecution.Status) + if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "ABORTED" { + subflowResult.Result = workflowExecution.Result + break + } + } + + if count >= subflowTimeout/3 { + break + } + + count += 1 + } + } + + if len(subflowResult.Result) > 0 { + newJson, err := json.Marshal(subflowResult) + if err == nil { + actionResult.Result = string(newJson) + } else { + log.Printf("[WARNING] Failed marshalling subflowresult: %s", err) + } + } + } else { + log.Printf("[WARNING] Skipping subresult check!") + } + } + if actionResult.Status == "ABORTED" || actionResult.Status == "FAILURE" { newResults := []ActionResult{} childNodes := []string{} @@ -5348,6 +5472,7 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut if trigger.AppName == "Shuffle Workflow" { curAction.AppName = "shuffle-subflow" } + break } } @@ -5499,7 +5624,7 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut workflowExecution.Results = append(workflowExecution.Results, actionResult) } } else { - log.Printf("[INFO] Setting value of %s (%s) in workflow %s to %s", actionResult.Action.Label, actionResult.Action.ID, workflowExecution.ExecutionId, actionResult.Status) + log.Printf("[INFO] Setting value of %s (INIT - %s) in workflow %s to %s", actionResult.Action.Label, actionResult.Action.ID, workflowExecution.ExecutionId, actionResult.Status) workflowExecution.Results = append(workflowExecution.Results, actionResult) } @@ -6127,7 +6252,7 @@ func ValidateSwagger(resp http.ResponseWriter, request *http.Request) { } // Recursively finds child nodes inside sub workflows -func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigger Trigger) ([]Action, []Branch, string) { +func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigger Trigger, lastTriggerName string) ([]Action, []Branch, string) { if execution.ExecutionOrg == "" { execution.ExecutionOrg = execution.Workflow.OrgId } @@ -6174,7 +6299,7 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg } //childNodes = FindChildNodes(workflowExecution, actionResult.Action.ID) - log.Printf("FIND CHILDNODES OF %s", workflowAction) + log.Printf("FIND CHILDNODES OF STARTNODE %s", workflowAction) workflowExecution := WorkflowExecution{ Workflow: *workflow, } @@ -6207,7 +6332,8 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg for actionIndex, action := range newActions { if lastNode == action.ID { //actions[actionIndex].Name = trigger.Name - newActions[actionIndex].Label = trigger.Label + newActions[actionIndex].Label = lastTriggerName + //trigger.Label found = true } } @@ -6220,7 +6346,7 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg if len(newActions) == len(childNodes) { return newActions, branches, lastNode } else { - log.Printf("[WARNING] Bad length of actions and nodes in subflow (subsubflow?): %d vs %d", len(newActions), len(childNodes)) + log.Printf("\n\n[WARNING] Bad length of actions and nodes in subflow (subsubflow?): %d vs %d", len(newActions), len(childNodes)) // Adding information about triggers if subflow changed := false @@ -6241,8 +6367,9 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg } if replaceActions { - replacementNodes, newBranches, lastnode := GetReplacementNodes(ctx, workflowExecution, trigger) + replacementNodes, newBranches, lastNode := GetReplacementNodes(ctx, workflowExecution, trigger, lastTriggerName) log.Printf("SUB REPLACEMENTS: %d, %d", len(replacementNodes), len(newBranches)) + log.Printf("\n\nNEW LASTNODE: %s\n\n", lastNode) if len(replacementNodes) > 0 { //workflowExecution.Workflow.Actions = append(workflowExecution.Workflow.Actions, action) @@ -6260,16 +6387,9 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg } if !found { + action.SubAction = true newActions = append(newActions, action) } - - // Check if it's already set to have a value - //for resultIndex, result := range defaultResults { - // if result.Action.ID == action.ID { - // defaultResults = append(defaultResults[:resultIndex], defaultResults[resultIndex+1:]...) - // break - // } - //} } for _, branch := range newBranches { @@ -6282,11 +6402,13 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg if branch.DestinationID == trigger.ID { log.Printf("REPLACE DESTINATION WITH %s!!", workflowAction) workflowExecution.Workflow.Branches[branchIndex].DestinationID = workflowAction + branches = append(branches, workflowExecution.Workflow.Branches[branchIndex]) } if branch.SourceID == trigger.ID { - log.Printf("REPLACE SOURCE WITH LASTNODE %s!!", lastnode) - workflowExecution.Workflow.Branches[branchIndex].SourceID = lastnode + log.Printf("REPLACE SOURCE WITH LASTNODE %s!!", lastNode) + workflowExecution.Workflow.Branches[branchIndex].SourceID = lastNode + branches = append(branches, workflowExecution.Workflow.Branches[branchIndex]) } } @@ -6295,8 +6417,6 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg workflow.Triggers = append(workflow.Triggers[:triggerIndex], workflow.Triggers[triggerIndex+1:]...) changed = true } - - log.Printf("NEW ACTION LENGTH %d, Triggers: %d", len(newActions), len(workflowExecution.Workflow.Triggers)) } } } @@ -6304,6 +6424,7 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg } } + log.Printf("NEW ACTION LENGTH %d, Branches: %d. LASTNODE: %s\n\n", len(newActions), len(branches), lastNode) if changed { return newActions, branches, lastNode } diff --git a/structs.go b/structs.go index d8690e6..5338003 100644 --- a/structs.go +++ b/structs.go @@ -630,6 +630,7 @@ type Action struct { AuthNotRequired bool `json:"auth_not_required,omitempty" datastore:"auth_not_required" yaml:"auth_not_required"` Category string `json:"category" datastore:"category"` ReferenceUrl string `json:"reference_url" datastore:"reference_url"` + SubAction bool `json:"sub_action" datastore:"sub_action"` } // Added environment for location to execute