diff --git a/db-connector.go b/db-connector.go index fe4b28e..b4283ef 100755 --- a/db-connector.go +++ b/db-connector.go @@ -2530,7 +2530,6 @@ func GetAllWorkflowsByQuery(ctx context.Context, user User) ([]Workflow, error) log.Printf("Cursorerror: %s", err) break } else { - //log.Printf("NEXTCURSOR: %s", nextCursor) nextStr := fmt.Sprintf("%s", nextCursor) if cursorStr == nextStr { break @@ -2538,8 +2537,6 @@ func GetAllWorkflowsByQuery(ctx context.Context, user User) ([]Workflow, error) cursorStr = nextStr query = query.Start(nextCursor) - //cursorStr = nextCursor - //break } } } @@ -4507,7 +4504,6 @@ func GetPrioritizedApps(ctx context.Context, user User) ([]WorkflowApp, error) { log.Printf("Cursorerror: %s", err) break } else { - //log.Printf("NEXTCURSOR: %s", nextCursor) nextStr := fmt.Sprintf("%s", nextCursor) if cursorStr == nextStr { break @@ -4515,8 +4511,6 @@ func GetPrioritizedApps(ctx context.Context, user User) ([]WorkflowApp, error) { cursorStr = nextStr query = query.Start(nextCursor) - //cursorStr = nextCursor - //break } if len(allApps) > maxLen { @@ -4592,7 +4586,6 @@ func GetPrioritizedApps(ctx context.Context, user User) ([]WorkflowApp, error) { log.Printf("Cursorerror: %s", err) break } else { - //log.Printf("NEXTCURSOR: %s", nextCursor) nextStr := fmt.Sprintf("%s", nextCursor) if cursorStr == nextStr { break @@ -4600,8 +4593,6 @@ func GetPrioritizedApps(ctx context.Context, user User) ([]WorkflowApp, error) { cursorStr = nextStr query = query.Start(nextCursor) - //cursorStr = nextCursor - //break } if len(allApps) > maxLen { @@ -5070,7 +5061,6 @@ func GetAllWorkflowApps(ctx context.Context, maxLen int, depth int) ([]WorkflowA log.Printf("Cursorerror: %s", err) break } else { - //log.Printf("NEXTCURSOR: %s", nextCursor) nextStr := fmt.Sprintf("%s", nextCursor) if cursorStr == nextStr { break @@ -5078,8 +5068,6 @@ func GetAllWorkflowApps(ctx context.Context, maxLen int, depth int) ([]WorkflowA cursorStr = nextStr query = query.Start(nextCursor) - //cursorStr = nextCursor - //break } if len(allApps) > maxLen && maxLen != 0 { @@ -5793,7 +5781,6 @@ func ListWorkflowRevisions(ctx context.Context, originalId string) ([]Workflow, log.Printf("Cursorerror: %s", err) break } else { - //log.Printf("NEXTCURSOR: %s", nextCursor) nextStr := fmt.Sprintf("%s", nextCursor) if cursorStr == nextStr { break @@ -5801,8 +5788,6 @@ func ListWorkflowRevisions(ctx context.Context, originalId string) ([]Workflow, cursorStr = nextStr query = query.Start(nextCursor) - //cursorStr = nextCursor - //break } } } @@ -7519,7 +7504,6 @@ func GetUnfinishedExecutions(ctx context.Context, workflowId string) ([]Workflow log.Printf("[WARNING] Cursorerror: %s", err) break } else { - //log.Printf("NEXTCURSOR: %s", nextCursor) nextStr := fmt.Sprintf("%s", nextCursor) if cursorStr == nextStr { break @@ -7540,6 +7524,295 @@ func GetUnfinishedExecutions(ctx context.Context, workflowId string) ([]Workflow return executions, nil } +func GetAllWorkflowExecutionsV2(ctx context.Context, workflowId string, amount int, inputcursor string) ([]WorkflowExecution, string, error) { + index := "workflowexecution" + + cacheKey := fmt.Sprintf("%s_%s_%s", index, inputcursor, workflowId) + var executions []WorkflowExecution + var err error + totalMaxSize := 11184810 + + + cursor := "" + if project.DbType == "opensearch" { + var buf bytes.Buffer + query := map[string]interface{}{ + "size": amount, + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{ + { + "match": map[string]interface{}{ + "workflow_id": workflowId, + }, + }, + }, + }, + }, + "sort": map[string]interface{}{ + "started_at": map[string]interface{}{ + "order": "desc", + }, + }, + } + if err := json.NewEncoder(&buf).Encode(query); err != nil { + log.Printf("[WARNING] Error encoding executions query: %s", err) + return executions, cursor, err + } + + // Perform the search request. + res, err := project.Es.Search( + project.Es.Search.WithContext(ctx), + project.Es.Search.WithIndex(strings.ToLower(GetESIndexPrefix(index))), + project.Es.Search.WithBody(&buf), + project.Es.Search.WithTrackTotalHits(true), + ) + if err != nil { + log.Printf("[ERROR] Error getting response from Opensearch (get workflow executions): %s", err) + return executions, cursor, err + } + + defer res.Body.Close() + if res.StatusCode == 404 { + return executions, cursor, nil + } + + defer res.Body.Close() + if res.IsError() { + var e map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&e); err != nil { + log.Printf("[WARNING] Error parsing the response body: %s", err) + return executions, cursor, err + } else { + // Print the response status and error information. + log.Printf("[%s] %s: %s", + res.Status(), + e["error"].(map[string]interface{})["type"], + e["error"].(map[string]interface{})["reason"], + ) + } + } + + if res.StatusCode != 200 && res.StatusCode != 201 { + return executions, cursor, errors.New(fmt.Sprintf("Bad statuscode: %d", res.StatusCode)) + } + + respBody, err := ioutil.ReadAll(res.Body) + if err != nil { + return executions, cursor, err + } + + wrapped := ExecutionSearchWrapper{} + err = json.Unmarshal(respBody, &wrapped) + if err != nil { + return executions, cursor, err + } + + executions = []WorkflowExecution{} + for _, hit := range wrapped.Hits.Hits { + if hit.Source.WorkflowId == workflowId || hit.Source.Workflow.ID == workflowId { + executions = append(executions, hit.Source) + } + } + + } else { + + query := datastore.NewQuery(index).Filter("workflow_id =", workflowId).Order("-started_at").Limit(5) + if inputcursor != "" { + outputcursor, err := datastore.DecodeCursor(inputcursor) + if err != nil { + log.Printf("[WARNING] Error decoding cursor: %s", err) + return executions, "", err + } + + query = query.Start(outputcursor) + } + + cursorStr := "" + for { + it := project.Dbclient.Run(ctx, query) + + for { + innerWorkflow := WorkflowExecution{} + _, err := it.Next(&innerWorkflow) + if err != nil { + //log.Printf("[WARNING] Error getting workflow executions: %s", err) + break + } + + executions = append(executions, innerWorkflow) + } + + if err != iterator.Done { + //log.Printf("Breaking due to no more iterator") + //log.Printf("[INFO] Failed fetching results: %v", err) + //break + } + + // This is a way to load as much data as we want, and the frontend will load the actual result for us + executionmarshal, err := json.Marshal(executions) + if err == nil { + if len(executionmarshal) > totalMaxSize { + // Reducing size + + for execIndex, execution := range executions { + // Making sure the first 5 are "always" proper + if execIndex < 5 { + continue + } + + newResults := []ActionResult{} + + newActions := []Action{} + for _, action := range execution.Workflow.Actions { + newAction := Action{ + Name: action.Name, + ID: action.ID, + AppName: action.AppName, + AppID: action.AppID, + } + + newActions = append(newActions, newAction) + } + + executions[execIndex].Workflow = Workflow{ + Name: execution.Workflow.Name, + ID: execution.Workflow.ID, + Triggers: execution.Workflow.Triggers, + Actions: newActions, + } + + for _, result := range execution.Results { + result.Result = "Result was too large to load. Full Execution needs to be loaded individually for this execution. Click \"Explore execution\" in the UI to see it in detail." + result.Action = Action{ + Name: result.Action.Name, + ID: result.Action.ID, + AppName: result.Action.AppName, + AppID: result.Action.AppID, + LargeImage: result.Action.LargeImage, + } + + newResults = append(newResults, result) + } + + executions[execIndex].ExecutionArgument = "too large" + executions[execIndex].Results = newResults + } + + executionmarshal, err = json.Marshal(executions) + if err == nil && len(executionmarshal) > totalMaxSize { + log.Printf("Length breaking (2): %d", len(executionmarshal)) + break + } + } + } + + // expected to get here + if len(executions) >= amount { + //log.Printf("[INFO] Breaking due to executions larger than amount (%d/%d)", len(executions), amount) + // Get next cursor + nextCursor, err := it.Cursor() + if err != nil { + log.Printf("[WARNING] Cursorerror: %s", err) + } else { + cursor = fmt.Sprintf("%s", nextCursor) + } + + break + } + + // Get the cursor for the next page of results. + nextCursor, err := it.Cursor() + if err != nil { + log.Printf("[WARNING] Cursorerror: %s", err) + break + } else { + nextStr := fmt.Sprintf("%s", nextCursor) + cursor = nextStr + if cursorStr == nextStr { + log.Printf("Breaking due to no new cursor") + + break + } + + cursorStr = nextStr + query = query.Start(nextCursor) + } + } + } + + slice.Sort(executions[:], func(i, j int) bool { + return executions[i].StartedAt > executions[j].StartedAt + }) + + executionmarshal, err := json.Marshal(executions) + if err == nil { + if len(executionmarshal) > totalMaxSize { + // Reducing size + + for execIndex, execution := range executions { + // Making sure the first 5 are "always" proper + if execIndex < 5 { + continue + } + + newResults := []ActionResult{} + + newActions := []Action{} + for _, action := range execution.Workflow.Actions { + newAction := Action{ + Name: action.Name, + ID: action.ID, + AppName: action.AppName, + AppID: action.AppID, + } + + newActions = append(newActions, newAction) + } + + executions[execIndex].Workflow = Workflow{ + Name: execution.Workflow.Name, + ID: execution.Workflow.ID, + Triggers: execution.Workflow.Triggers, + Actions: newActions, + } + + for _, result := range execution.Results { + result.Result = "Result was too large to load. Full Execution needs to be loaded individually for this execution. Click \"Explore execution\" in the UI to see it in detail." + result.Action = Action{ + Name: result.Action.Name, + ID: result.Action.ID, + AppName: result.Action.AppName, + AppID: result.Action.AppID, + LargeImage: result.Action.LargeImage, + } + + newResults = append(newResults, result) + } + + executions[execIndex].ExecutionArgument = "too large" + executions[execIndex].Results = newResults + } + } + } + + if project.CacheDb { + data, err := json.Marshal(executions) + if err != nil { + log.Printf("[WARNING] Failed marshalling update execution cache: %s", err) + return executions, cursor, nil + } + + err = SetCache(ctx, cacheKey, data, 10) + if err != nil { + log.Printf("[WARNING] Failed setting cache executions (%s): %s", workflowId, err) + return executions, cursor, nil + } + } + + return executions, cursor, nil +} + func GetAllWorkflowExecutions(ctx context.Context, workflowId string, amount int) ([]WorkflowExecution, error) { index := "workflowexecution" @@ -7752,7 +8025,6 @@ func GetAllWorkflowExecutions(ctx context.Context, workflowId string, amount int log.Printf("[WARNING] Cursorerror: %s", err) break } else { - //log.Printf("NEXTCURSOR: %s", nextCursor) nextStr := fmt.Sprintf("%s", nextCursor) if cursorStr == nextStr { //log.Printf("Breaking due to no new cursor") @@ -8151,7 +8423,6 @@ func GetAppExecutionValues(ctx context.Context, parameterNames, orgId, workflowI log.Printf("Cursorerror: %s", err) break } else { - //log.Printf("NEXTCURSOR: %s", nextCursor) nextStr := fmt.Sprintf("%s", nextCursor) if cursorStr == nextStr { break @@ -8159,9 +8430,6 @@ func GetAppExecutionValues(ctx context.Context, parameterNames, orgId, workflowI cursorStr = nextStr query = query.Start(nextCursor) - //cursorStr = nextCursor - //break - } } } @@ -8875,7 +9143,15 @@ func GetAllCacheKeys(ctx context.Context, orgId string, max int, inputcursor str // Query datastore with pages query := datastore.NewQuery(nameKey).Filter("OrgId =", orgId).Limit(max) - //.Cursor(inputcursor) + if inputcursor != "" { + outputcursor, err := datastore.DecodeCursor(inputcursor) + if err != nil { + log.Printf("[WARNING] Error decoding cursor: %s", err) + return cacheKeys, "", err + } + + query = query.Start(outputcursor) + } // Skip page in query cursorStr := inputcursor @@ -8899,13 +9175,25 @@ func GetAllCacheKeys(ctx context.Context, orgId string, max int, inputcursor str //break } + if len(cacheKeys) >= max { + // Get next cursor and set it as the new cursor + + nextCursor, err := it.Cursor() + if err != nil { + log.Printf("[WARNING] Cursorerror for cache: %s", err) + } else { + cursor = fmt.Sprintf("%s", nextCursor) + } + + break + } + // Get the cursor for the next page of results. nextCursor, err := it.Cursor() if err != nil { log.Printf("Cursorerror: %s", err) break } else { - //log.Printf("NEXTCURSOR: %s", nextCursor) nextStr := fmt.Sprintf("%s", nextCursor) if cursorStr == nextStr { break @@ -8919,9 +9207,6 @@ func GetAllCacheKeys(ctx context.Context, orgId string, max int, inputcursor str //break } - if len(cacheKeys) >= max { - break - } } log.Printf("[INFO] Got %d cacheKeys for org %s (datastore)", len(cacheKeys), orgId) diff --git a/shared.go b/shared.go index 500d70a..78507ba 100755 --- a/shared.go +++ b/shared.go @@ -3069,6 +3069,7 @@ func GetWorkflowExecutions(resp http.ResponseWriter, request *http.Request) { return } + //log.Printf("[DEBUG] Found %d executions for workflow %s", len(workflowExecutions), fileId) if len(workflowExecutions) == 0 { @@ -3137,6 +3138,169 @@ func GetWorkflowExecutions(resp http.ResponseWriter, request *http.Request) { resp.Write(newjson) } +func GetWorkflowExecutionsV2(resp http.ResponseWriter, request *http.Request) { + cors := HandleCors(resp, request) + if cors { + return + } + + user, err := HandleApiAuthentication(resp, request) + if err != nil { + log.Printf("[WARNING] Api authentication failed in getting workflow executions: %s", err) + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false}`)) + return + } + + location := strings.Split(request.URL.String(), "/") + + var fileId string + if location[1] == "api" { + if len(location) <= 4 { + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false}`)) + return + } + + fileId = location[4] + } + + if len(fileId) != 36 { + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false, "reason": "Workflow ID when getting workflow executions is not valid"}`)) + return + } + + ctx := GetContext(request) + workflow, err := GetWorkflow(ctx, fileId) + if err != nil { + log.Printf("[WARNING] Failed getting the workflow %s locally (get executions): %s", fileId, err) + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false}`)) + return + } + + if user.Id != workflow.Owner || len(user.Id) == 0 { + if workflow.OrgId == user.ActiveOrg.Id { + log.Printf("[AUDIT] User %s is accessing workflow '%s' (%s) executions as %s (get executions)", user.Username, workflow.Name, workflow.ID, user.Role) + } else if project.Environment == "cloud" && user.Verified == true && user.Active == true && user.SupportAccess == true && strings.HasSuffix(user.Username, "@shuffler.io") { + log.Printf("[AUDIT] Letting verified support admin %s access workflow execs for %s", user.Username, workflow.ID) + } else { + log.Printf("[AUDIT] Wrong user (%s) for workflow %s (get workflow execs)", user.Username, workflow.ID) + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false}`)) + return + } + } + + // Query for the specifci workflowId + //q := datastore.NewQuery("workflowexecution").Filter("workflow_id =", fileId).Order("-started_at").Limit(30) + //q := datastore.NewQuery("workflowexecution").Filter("workflow_id =", fileId) + maxAmount := 100 + top, topOk := request.URL.Query()["top"] + if topOk && len(top) > 0 { + val, err := strconv.Atoi(top[0]) + if err == nil { + maxAmount = val + } + } + + if maxAmount > 1000 { + maxAmount = 1000 + } + + cursor := "" + cursorList, cursorOk := request.URL.Query()["cursor"] + if cursorOk && len(cursorList) > 0 { + cursor = cursorList[0] + } + + log.Printf("[DEBUG] Getting %d executions for workflow %s.", maxAmount, fileId) + + workflowExecutions, newCursor, err := GetAllWorkflowExecutionsV2(ctx, fileId, maxAmount, cursor) + if err != nil { + log.Printf("[WARNING] Failed getting executions for %s", fileId) + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false}`)) + return + } + + if len(workflowExecutions) == 0 { + resp.WriteHeader(200) + resp.Write([]byte("[]")) + return + } + + for index, execution := range workflowExecutions { + newResults := []ActionResult{} + newActions := []Action{} + newTriggers := []Trigger{} + + // Results + for _, result := range execution.Results { + newParams := []WorkflowAppActionParameter{} + for _, param := range result.Action.Parameters { + if param.Configuration || strings.Contains(strings.ToLower(param.Name), "user") || strings.Contains(strings.ToLower(param.Name), "key") || strings.Contains(strings.ToLower(param.Name), "pass") { + param.Value = "" + //log.Printf("FOUND CONFIG: %s!!", param.Name) + } + + newParams = append(newParams, param) + } + + result.Action.Parameters = newParams + newResults = append(newResults, result) + } + + // Actions + for _, action := range execution.Workflow.Actions { + newParams := []WorkflowAppActionParameter{} + for _, param := range action.Parameters { + if param.Configuration || strings.Contains(strings.ToLower(param.Name), "user") || strings.Contains(strings.ToLower(param.Name), "key") || strings.Contains(strings.ToLower(param.Name), "pass") { + param.Value = "" + //log.Printf("FOUND CONFIG: %s!!", param.Name) + } + + newParams = append(newParams, param) + } + + action.Parameters = newParams + newActions = append(newActions, action) + } + + for _, trigger := range execution.Workflow.Triggers { + trigger.LargeImage = "" + trigger.SmallImage = "" + newTriggers = append(newTriggers, trigger) + } + + workflowExecutions[index].Results = newResults + + workflowExecutions[index].Workflow.Actions = newActions + workflowExecutions[index].Workflow.Image = "" + workflowExecutions[index].Workflow.Triggers = newTriggers + + // Would like to omit the whole thing :thinking: + //workflowExecutions[index].Workflow = Workflow{} + } + + newReturn := ExecutionReturn{ + Success: true, + Cursor: newCursor, + Executions: workflowExecutions, + } + + newjson, err := json.Marshal(newReturn) + if err != nil { + resp.WriteHeader(401) + resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed unpacking workflow executions"}`))) + return + } + + resp.WriteHeader(200) + resp.Write(newjson) +} + func GetWorkflows(resp http.ResponseWriter, request *http.Request) { cors := HandleCors(resp, request) if cors { @@ -12857,13 +13021,6 @@ func CheckHookAuth(request *http.Request, auth string) error { return nil } - log.Printf("[INFO] Checking hook auth: %s", auth) - // Print headers - for name, headers := range request.Header { - name = strings.ToLower(name) - log.Printf("[INFO] %s = %s", name, headers) - } - authSplit := strings.Split(auth, "\n") for _, line := range authSplit { lineSplit := strings.Split(line, "=") @@ -19680,7 +19837,7 @@ func GetWorkflowSuggestions(ctx context.Context, user User, org *Org, orgUpdated } } - log.Printf("[DEBUG] Inside workflow suggestions. Usecases: %d", usecasesAdded) + //log.Printf("[DEBUG] Inside workflow suggestions. Usecases: %d", usecasesAdded) if usecasesAdded < 3 { log.Printf("[DEBUG] Should check if workflows still are the same amount or not to change priorities") diff --git a/structs.go b/structs.go index fe28f0a..3a71fd6 100755 --- a/structs.go +++ b/structs.go @@ -917,7 +917,7 @@ type WorkflowExecution struct { Result string `json:"result" datastore:"result,noindex"` ProjectId string `json:"project_id" datastore:"project_id"` Locations []string `json:"locations" datastore:"locations"` - Workflow Workflow `json:"workflow" datastore:"workflow,noindex"` + Workflow Workflow `json:"workflow,omitempty" datastore:"workflow,noindex"` Results []ActionResult `json:"results" datastore:"results,noindex"` ExecutionVariables []Variable `json:"execution_variables,omitempty" datastore:"execution_variables,omitempty"` OrgId string `json:"org_id" datastore:"org_id"` @@ -3426,6 +3426,13 @@ type OrborusStats struct { TotalContainers int `json:"total_containers"` } +// Create struct +type ExecutionReturn struct { + Success bool `json:"success"` + Executions []WorkflowExecution `json:"executions"` + Cursor string `json:"cursor"` +} + // Create struct type CacheReturn struct { Success bool `json:"success"`