Skip to content

Commit

Permalink
Merge pull request #111 from yashsinghcodes/WAITING-HALT
Browse files Browse the repository at this point in the history
FIX FOR SUBFLOW HALTING ISSUE
  • Loading branch information
frikky authored Oct 4, 2024
2 parents 5a30d86 + 56d8429 commit df8c722
Showing 1 changed file with 26 additions and 33 deletions.
59 changes: 26 additions & 33 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -11896,7 +11896,6 @@ func sendMailSendgrid(toEmail []string, subject, body string, emailApp bool, Bcc
}
}


parsedBody, err := json.Marshal(newBody)
if err != nil {
log.Printf("[ERROR] Failed to parse JSON in sendmail: %s", err)
Expand Down Expand Up @@ -14064,22 +14063,23 @@ func updateExecutionParent(ctx context.Context, executionParent, returnValue, pa
newResults = append(newResults, res)
}

if finishedSubflows == len(newResults) {
log.Printf("[DEBUG][%s] Finished workflow because status of all should be set to finished now", subflowExecutionId)

// Status is used to determine if the current subflow is finished
log.Printf("[INFO][%s] TOTAL FINISHED SUBFLOWS: %d/%d", subflowExecutionId, len(parentSubflowResult), len(newResults))

/*
foundResult.Status = "SUCCESS"
// Can it be if this and also status = "WAITING"?
if len(parentSubflowResult) == finishedSubflows && foundResult.Status != "SUCCESS" && foundResult.Status != "FAILURE" {
log.Printf("[INFO][%s] ALL THE SUBFLOW GOT THE RESULT BACK SO UPATING THE STATUS TO SUCCESS")
foundResult.Status = "SUCCESS"
if foundResult.CompletedAt == 0 {
foundResult.CompletedAt = time.Now().Unix() * 1000
}
ranUpdate = true

if result.CompletedAt == 0 {
result.CompletedAt = time.Now().Unix()*1000
}
*/
sendRequest = true
}

if ranUpdate {

// FIXME: Look into whether this sendRequest can be removed if we want reduce the amount of request
sendRequest = true
baseResultData, err := json.Marshal(newResults)
if err != nil {
Expand Down Expand Up @@ -14583,7 +14583,7 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut

// Verifying if the userinput should be sent properly or not
if actionResult.Action.Name == "run_userinput" && actionResult.Status != "SKIPPED" {
//log.Printf("\n\n[INFO] Inside userinput default return! Return data: %s", actionResult.Result)
// log.Printf("\n\n[INFO] Inside userinput default return! Return data: %s", actionResult.Result)
actionResult.Status = "WAITING"
actionResult.CompletedAt = time.Now().Unix() * 1000

Expand Down Expand Up @@ -15417,6 +15417,7 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut
}

updateParentRan := false

if len(workflowExecution.Results) == len(workflowExecution.Workflow.Actions)+extraInputs {
//log.Printf("\nIN HERE WITH RESULTS %d vs %d\n", len(workflowExecution.Results), len(workflowExecution.Workflow.Actions)+extraInputs)
finished := true
Expand All @@ -15441,7 +15442,6 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut
//log.Printf("[INFO][%s] Execution in workflow %s finished (not subflow).", workflowExecution.ExecutionId, workflowExecution.Workflow.ID)
} else {
log.Printf("[INFO][%s] SubExecution of parentExecution %s in workflow %s finished (subflow).", workflowExecution.ExecutionId, workflowExecution.ExecutionParent, workflowExecution.Workflow.ID)

}

for actionIndex, action := range workflowExecution.Workflow.Actions {
Expand Down Expand Up @@ -15646,7 +15646,6 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut

// Setting to waiting, as it should be updated by child executions' fill-ins from their result when they finish
workflowExecution.Status = "EXECUTING"

amountFinished := 0
for _, subflowData := range subflowDataList {
if subflowData.ResultSet || len(subflowData.Result) > 0 {
Expand Down Expand Up @@ -20336,24 +20335,24 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
Value: actionLabelParsed,

ActionField: "",
ID: uuid.NewV4().String(),
Name: "source",
Variant: "STATIC_VALUE",
ID: uuid.NewV4().String(),
Name: "source",
Variant: "STATIC_VALUE",
},
Condition: WorkflowAppActionParameter{
Value: "equals",

ID: uuid.NewV4().String(),
Name: "condition",
ID: uuid.NewV4().String(),
Name: "condition",
Variant: "STATIC_VALUE",
},
Destination: WorkflowAppActionParameter{
Value: "true",

ActionField: "",
ID: uuid.NewV4().String(),
Name: "destination",
Variant: "STATIC_VALUE",
ID: uuid.NewV4().String(),
Name: "destination",
Variant: "STATIC_VALUE",
},
}

Expand Down Expand Up @@ -23966,7 +23965,6 @@ func GetExternalClient(baseUrl string) *http.Client {
httpProxy := os.Getenv("SHUFFLE_INTERNAL_HTTP_PROXY")
httpsProxy := os.Getenv("SHUFFLE_INTERNAL_HTTPS_PROXY")


transport := http.DefaultTransport.(*http.Transport)
transport.MaxIdleConnsPerHost = 100
transport.ResponseHeaderTimeout = time.Second * 60
Expand Down Expand Up @@ -24000,7 +23998,6 @@ func GetExternalClient(baseUrl string) *http.Client {
log.Printf("[INFO] Reading self signed certificates from custom dir '%s'", certDir)
}


files, err := os.ReadDir(certDir)
if err == nil && os.Getenv("SHUFFLE_CERT_DIR") != "" {
for _, file := range files {
Expand All @@ -24023,7 +24020,7 @@ func GetExternalClient(baseUrl string) *http.Client {

if (len(httpProxy) > 0 || len(httpsProxy) > 0) && baseUrl != "http://shuffle-backend:5001" {
//client = &http.Client{}
if len(httpProxy) > 0 && httpProxy != "noproxy"{
if len(httpProxy) > 0 && httpProxy != "noproxy" {
log.Printf("[INFO] Running with HTTP proxy %s (env: HTTP_PROXY)", httpProxy)

url_i := url.URL{}
Expand All @@ -24032,7 +24029,7 @@ func GetExternalClient(baseUrl string) *http.Client {
transport.Proxy = http.ProxyURL(url_proxy)
}
}
if len(httpsProxy) > 0 && httpsProxy != "noproxy"{
if len(httpsProxy) > 0 && httpsProxy != "noproxy" {
log.Printf("[INFO] Running with HTTPS proxy %s (env: HTTPS_PROXY)", httpsProxy)

url_i := url.URL{}
Expand All @@ -24042,7 +24039,7 @@ func GetExternalClient(baseUrl string) *http.Client {
}
}
} else {
// keeping this here for now
// keeping this here for now
if len(httpProxy) > 0 && httpProxy != "noproxy" {
log.Printf("[INFO] Running with HTTP proxy %s (env: HTTP_PROXY)", httpProxy)

Expand Down Expand Up @@ -28406,10 +28403,8 @@ func HandleExecutionCacheIncrement(ctx context.Context, execution WorkflowExecut
}
}


// FIXME: Always fails:


func GetChildWorkflows(resp http.ResponseWriter, request *http.Request) {
cors := HandleCors(resp, request)
if cors {
Expand Down Expand Up @@ -28474,7 +28469,7 @@ func GetChildWorkflows(resp http.ResponseWriter, request *http.Request) {
continue
}

org, err := GetOrg(ctx, orgId)
org, err := GetOrg(ctx, orgId)
if err != nil {
log.Printf("[WARNING] Failed getting org during parent org loading %s: %s", org.Id, err)
resp.WriteHeader(500)
Expand All @@ -28486,7 +28481,7 @@ func GetChildWorkflows(resp http.ResponseWriter, request *http.Request) {
if user.Id == orgUser.Id {
user.Role = orgUser.Role
user.ActiveOrg.Id = org.Id
orgUserFound = true
orgUserFound = true
}
}

Expand Down Expand Up @@ -28520,8 +28515,6 @@ func GetChildWorkflows(resp http.ResponseWriter, request *http.Request) {
}
}



// Access is granted -> get revisions
childWorkflows, err := ListChildWorkflows(ctx, workflow.ID)
if err != nil {
Expand Down

0 comments on commit df8c722

Please sign in to comment.