Skip to content

Commit

Permalink
Added extra subflow execution checks
Browse files Browse the repository at this point in the history
  • Loading branch information
frikky committed May 19, 2021
1 parent 923344f commit 13d3848
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 18 deletions.
157 changes: 139 additions & 18 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -5197,6 +5197,130 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut
ExecutionVariable: actionResult.Action.ExecutionVariable,
}

// Fills in data from subflows, whether they're loops or not
if actionResult.Status == "SUCCESS" && actionResult.Action.AppName == "shuffle-subflow" {
runCheck := false
for _, param := range actionResult.Action.Parameters {
if param.Name == "check_result" {
log.Printf("RESULT: %#v", param)
if param.Value == "true" {
runCheck = true
}

break
}
}

//log.Printf("\n\nRUNCHECK: %#v\n\n", runCheck)
if runCheck {
log.Printf("[INFO] Validating subflow result in workflow %s", workflowExecution.ExecutionId)
type SubflowData struct {
Success bool `json:"success"`
ExecutionId string `json:"execution_id"`
Authorization string `json:"authorization"`
Result string `json:"result"`
}

// WAY lower timeout in cloud
// Should probably change it for enterprise customers?
// Idk how to handle this in cloud yet.
subflowTimeout := 180
if project.Environment == "cloud" {
subflowTimeout = 15
}

subflowResult := SubflowData{}
subflowResults := []SubflowData{}
err = json.Unmarshal([]byte(actionResult.Result), &subflowResult)
if err != nil {
subflowResults = []SubflowData{}
err = json.Unmarshal([]byte(actionResult.Result), &subflowResults)
if err == nil {
log.Printf("Should get data for %d subflow executions", len(subflowResults))
count := 0
updated := false
for {
time.Sleep(3 * time.Second)

finished := 0
for subflowIndex, subflowResult := range subflowResults {
if !subflowResult.Success || len(subflowResult.Result) != 0 {
finished += 1
continue
}

workflowExecution, err := GetWorkflowExecution(ctx, subflowResult.ExecutionId)
if err != nil {
log.Printf("[WARNING] Error getting subflow data: %s", err)
} else {
//log.Printf("Results: %d, status: %s", len(workflowExecution.Results), workflowExecution.Status)
if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "ABORTED" {
subflowResults[subflowIndex].Result = workflowExecution.Result
updated = true
finished += 1
}
}
}

if finished == len(subflowResults) {
break
}

if count >= subflowTimeout/3 {
break
}

count += 1
}

if updated {
newJson, err := json.Marshal(subflowResults)
if err == nil {
actionResult.Result = string(newJson)
} else {
log.Printf("[WARNING] Failed marshalling subflowresultS: %s", err)
}
}
}
}

if err == nil && subflowResult.Success == true && len(subflowResult.ExecutionId) > 0 {
log.Printf("Should get data for subflow execution %s", subflowResult.ExecutionId)
count := 0
for {
time.Sleep(3 * time.Second)
workflowExecution, err := GetWorkflowExecution(ctx, subflowResult.ExecutionId)
if err != nil {
log.Printf("[WARNING] Error getting subflow data: %s", err)
} else {
//log.Printf("Results: %d, status: %s", len(workflowExecution.Results), workflowExecution.Status)
if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "ABORTED" {
subflowResult.Result = workflowExecution.Result
break
}
}

if count >= subflowTimeout/3 {
break
}

count += 1
}
}

if len(subflowResult.Result) > 0 {
newJson, err := json.Marshal(subflowResult)
if err == nil {
actionResult.Result = string(newJson)
} else {
log.Printf("[WARNING] Failed marshalling subflowresult: %s", err)
}
}
} else {
log.Printf("[WARNING] Skipping subresult check!")
}
}

if actionResult.Status == "ABORTED" || actionResult.Status == "FAILURE" {
newResults := []ActionResult{}
childNodes := []string{}
Expand Down Expand Up @@ -5348,6 +5472,7 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut
if trigger.AppName == "Shuffle Workflow" {
curAction.AppName = "shuffle-subflow"
}

break
}
}
Expand Down Expand Up @@ -5499,7 +5624,7 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut
workflowExecution.Results = append(workflowExecution.Results, actionResult)
}
} else {
log.Printf("[INFO] Setting value of %s (%s) in workflow %s to %s", actionResult.Action.Label, actionResult.Action.ID, workflowExecution.ExecutionId, actionResult.Status)
log.Printf("[INFO] Setting value of %s (INIT - %s) in workflow %s to %s", actionResult.Action.Label, actionResult.Action.ID, workflowExecution.ExecutionId, actionResult.Status)
workflowExecution.Results = append(workflowExecution.Results, actionResult)
}

Expand Down Expand Up @@ -6127,7 +6252,7 @@ func ValidateSwagger(resp http.ResponseWriter, request *http.Request) {
}

// Recursively finds child nodes inside sub workflows
func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigger Trigger) ([]Action, []Branch, string) {
func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigger Trigger, lastTriggerName string) ([]Action, []Branch, string) {
if execution.ExecutionOrg == "" {
execution.ExecutionOrg = execution.Workflow.OrgId
}
Expand Down Expand Up @@ -6174,7 +6299,7 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg
}

//childNodes = FindChildNodes(workflowExecution, actionResult.Action.ID)
log.Printf("FIND CHILDNODES OF %s", workflowAction)
log.Printf("FIND CHILDNODES OF STARTNODE %s", workflowAction)
workflowExecution := WorkflowExecution{
Workflow: *workflow,
}
Expand Down Expand Up @@ -6207,7 +6332,8 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg
for actionIndex, action := range newActions {
if lastNode == action.ID {
//actions[actionIndex].Name = trigger.Name
newActions[actionIndex].Label = trigger.Label
newActions[actionIndex].Label = lastTriggerName
//trigger.Label
found = true
}
}
Expand All @@ -6220,7 +6346,7 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg
if len(newActions) == len(childNodes) {
return newActions, branches, lastNode
} else {
log.Printf("[WARNING] Bad length of actions and nodes in subflow (subsubflow?): %d vs %d", len(newActions), len(childNodes))
log.Printf("\n\n[WARNING] Bad length of actions and nodes in subflow (subsubflow?): %d vs %d", len(newActions), len(childNodes))

// Adding information about triggers if subflow
changed := false
Expand All @@ -6241,8 +6367,9 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg
}

if replaceActions {
replacementNodes, newBranches, lastnode := GetReplacementNodes(ctx, workflowExecution, trigger)
replacementNodes, newBranches, lastNode := GetReplacementNodes(ctx, workflowExecution, trigger, lastTriggerName)
log.Printf("SUB REPLACEMENTS: %d, %d", len(replacementNodes), len(newBranches))
log.Printf("\n\nNEW LASTNODE: %s\n\n", lastNode)
if len(replacementNodes) > 0 {
//workflowExecution.Workflow.Actions = append(workflowExecution.Workflow.Actions, action)

Expand All @@ -6260,16 +6387,9 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg
}

if !found {
action.SubAction = true
newActions = append(newActions, action)
}

// Check if it's already set to have a value
//for resultIndex, result := range defaultResults {
// if result.Action.ID == action.ID {
// defaultResults = append(defaultResults[:resultIndex], defaultResults[resultIndex+1:]...)
// break
// }
//}
}

for _, branch := range newBranches {
Expand All @@ -6282,11 +6402,13 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg
if branch.DestinationID == trigger.ID {
log.Printf("REPLACE DESTINATION WITH %s!!", workflowAction)
workflowExecution.Workflow.Branches[branchIndex].DestinationID = workflowAction
branches = append(branches, workflowExecution.Workflow.Branches[branchIndex])
}

if branch.SourceID == trigger.ID {
log.Printf("REPLACE SOURCE WITH LASTNODE %s!!", lastnode)
workflowExecution.Workflow.Branches[branchIndex].SourceID = lastnode
log.Printf("REPLACE SOURCE WITH LASTNODE %s!!", lastNode)
workflowExecution.Workflow.Branches[branchIndex].SourceID = lastNode
branches = append(branches, workflowExecution.Workflow.Branches[branchIndex])
}
}

Expand All @@ -6295,15 +6417,14 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg
workflow.Triggers = append(workflow.Triggers[:triggerIndex], workflow.Triggers[triggerIndex+1:]...)
changed = true
}

log.Printf("NEW ACTION LENGTH %d, Triggers: %d", len(newActions), len(workflowExecution.Workflow.Triggers))
}
}
}

}
}

log.Printf("NEW ACTION LENGTH %d, Branches: %d. LASTNODE: %s\n\n", len(newActions), len(branches), lastNode)
if changed {
return newActions, branches, lastNode
}
Expand Down
1 change: 1 addition & 0 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ type Action struct {
AuthNotRequired bool `json:"auth_not_required,omitempty" datastore:"auth_not_required" yaml:"auth_not_required"`
Category string `json:"category" datastore:"category"`
ReferenceUrl string `json:"reference_url" datastore:"reference_url"`
SubAction bool `json:"sub_action" datastore:"sub_action"`
}

// Added environment for location to execute
Expand Down

0 comments on commit 13d3848

Please sign in to comment.