Skip to content

Commit

Permalink
Added auth validation at the end of a workflow to see if it's functio…
Browse files Browse the repository at this point in the history
…nal or not
  • Loading branch information
frikky committed Jun 25, 2024
1 parent 0d82a69 commit 9961fe4
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 2 deletions.
8 changes: 8 additions & 0 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,7 @@ func SetWorkflowExecution(ctx context.Context, workflowExecution WorkflowExecuti
return nil
}


// Deleting cache so that listing can work well
DeleteCache(ctx, fmt.Sprintf("%s_%s", nameKey, workflowExecution.WorkflowId))
DeleteCache(ctx, fmt.Sprintf("%s_%s_50", nameKey, workflowExecution.WorkflowId))
Expand All @@ -1102,6 +1103,12 @@ func SetWorkflowExecution(ctx context.Context, workflowExecution WorkflowExecuti
}
}

if newexec.Status == "FINISHED" || newexec.Status == "ABORTED" {
// Handles stat updates
go checkExecutionStatus(ctx, *newexec)

}

// New struct, to not add body, author etc
//log.Printf("[DEBUG][%s] Adding execution to database, not just cache. Workflow: %s (%s)", workflowExecution.ExecutionId, workflowExecution.Workflow.Name, workflowExecution.Workflow.ID)
if project.DbType == "opensearch" {
Expand Down Expand Up @@ -9553,6 +9560,7 @@ func GetAllFiles(ctx context.Context, orgId, namespace string) ([]File, error) {
return files, nil
}

// Gets a specific auth for an org
func GetWorkflowAppAuthDatastore(ctx context.Context, id string) (*AppAuthenticationStorage, error) {
nameKey := "workflowappauth"
cacheKey := fmt.Sprintf("%s_%s", nameKey, id)
Expand Down
2 changes: 1 addition & 1 deletion oauth2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3834,7 +3834,7 @@ func RunOauth2Request(ctx context.Context, user User, appAuth AppAuthenticationS
requestRefreshUrl := fmt.Sprintf("%s", refreshUrl)
refreshData := fmt.Sprintf("grant_type=refresh_token&refresh_token=%s&scope=%s&client_id=%s&client_secret=%s", refreshToken, strings.Replace(requestData.Scope, " ", "%20", -1), requestData.ClientId, requestData.ClientSecret)

log.Printf("[DEBUG] Refresh URL: %s?%s", requestRefreshUrl, refreshData)
//log.Printf("[DEBUG] Refresh URL: %s?%s", requestRefreshUrl, refreshData)

req, err := http.NewRequest(
"POST",
Expand Down
148 changes: 147 additions & 1 deletion shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -7880,14 +7880,18 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
// ID first, then name + version
// If it can't find param, it will swap it over farther down
for _, app := range workflowapps {
if app.ID == "" {
break
}

if app.ID == action.AppID {
curapp = app
break
}
}

if curapp.ID == "" && action.AppID != "integration" {
log.Printf("[WARNING] Didn't find the App ID for %s", action.AppID)
log.Printf("[WARNING] Didn't find the App ID for action %s '%s'", action.Label, action.AppID)
for _, app := range workflowapps {
if app.ID == action.AppID {
curapp = app
Expand Down Expand Up @@ -27820,3 +27824,145 @@ func GetChildWorkflows(resp http.ResponseWriter, request *http.Request) {
resp.WriteHeader(200)
resp.Write(body)
}

// Updates statuses in relevant areas according to what happened in the workflow run
func checkExecutionStatus(ctx context.Context, exec WorkflowExecution) {

// Check if this is already done
if exec.Status != "FINISHED" && exec.Status != "ABORTED" {
return
}

// FIXME: Skipping subexecs, as they are usually not relevant by themselves
if len(exec.ExecutionParent) > 0 {
return
}

// Create cache as to whether this has been ran in the last minute
cacheKey := fmt.Sprintf("execstatus_%s", exec.ExecutionId)
_, err := GetCache(ctx, cacheKey)
if err == nil {
//log.Printf("[DEBUG][%s] Execution status already checked", exec.ExecutionId)
return
}

// Set cache for 1 minute
SetCache(ctx, cacheKey, []byte{1}, 1)


log.Printf("\n\n[DEBUG][%s] Running status fixing for workflow %#v to see if auth + workflow(s) are functional. Results: %d\n\n", exec.ExecutionId, exec.Workflow.ID, len(exec.Results))
orgId := exec.ExecutionOrg
allAuth, err := GetAllWorkflowAppAuth(ctx, orgId)
if err != nil {
log.Printf("[ERROR] Failed getting all auths for org during stat checks %s: %s", orgId, err)
return
}

workflow, err := GetWorkflow(ctx, exec.Workflow.ID)
if err != nil {
log.Printf("[ERROR] Failed getting real workflow for %s: %s", exec.Workflow.ID, err)
return
}

_ = allAuth

handledAuth := []string{}
for _, result := range exec.Results {
// FIXME: Skipping anything that outright fails right now
if result.Status != "SUCCESS" {
continue
}

found := false
foundAction := Action{}
for _, action := range workflow.Actions {
if action.ID != result.Action.ID {
continue
}

// Check if this is an authentication action
if action.AuthenticationId == "" {
break
}

found = true
foundAction = action
break
}

if !found {
continue
}

if ArrayContains(handledAuth, foundAction.AuthenticationId) {
continue
}

unmarshalledHttp := HTTPOutput{}
err := json.Unmarshal([]byte(result.Result), &unmarshalledHttp)
if err != nil {
log.Printf("[ERROR] Failed unmarshalling http result for %s: %s", result.Action.Label, err)
continue
}

isValid := false
if unmarshalledHttp.Success == true {
if unmarshalledHttp.Status >= 200 && unmarshalledHttp.Status < 300 {
isValid = true
}
}

log.Printf("[DEBUG][%s] Checking result for %s", exec.ExecutionId, result.Action.Label)
handledAuth = append(handledAuth, foundAction.AuthenticationId)

for _, auth := range allAuth {
if auth.Id != foundAction.AuthenticationId {
continue
}

authUpdated := false
// Check if the auth is still valid
if !isValid {
// Check if existing is valid or not
// if auth.Validation.V == false {
// //log.Printf("[DEBUG] Auth %s is already invalid", auth.Id)
if auth.Validation.Valid {
auth.Validation.Valid = false

authUpdated = true
}

// Making sure it's set once, with tests
if auth.Validation.ChangedAt == 0 {
authUpdated = true
}
} else {
// New is valid if here. If already valid, do nothing
if !auth.Validation.Valid {
auth.Validation.Valid = true
authUpdated = true
}
}

if authUpdated {
timenow := time.Now().Unix() * 1000

auth.Validation.ChangedAt = timenow
if auth.Validation.Valid {
auth.Validation.LastValid = timenow
}

auth.Validation.WorkflowId = workflow.ID
auth.Validation.ExecutionId = exec.ExecutionId
auth.Validation.NodeId = result.Action.ID

err = SetWorkflowAppAuthDatastore(ctx, auth, auth.Id)
if err != nil {
log.Printf("[ERROR] Failed updating auth at end of workflow run %s: %s", auth.Id, err)
} else {
log.Printf("[DEBUG] Updated auth %s for workflow %s", auth.Id, workflow.ID)
}
}
}
}
}
15 changes: 15 additions & 0 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,7 +1428,20 @@ type AppAuthenticationGroup struct {
AppAuths []AppAuthenticationStorage `json:"app_auths" datastore:"app_auths,noindex"`
}

type AuthValidation struct {
Valid bool `json:"valid" datastore:"valid"`
ChangedAt int64 `json:"changed_at" datastore:"changed_at"`

LastValid int64 `json:"last_valid" datastore:"last_valid"`

// For the last update, which did it
WorkflowId string `json:"workflow_id" datastore:"workflow_id"`
ExecutionId string `json:"execution_id" datastore:"execution_id"`
NodeId string `json:"node_id" datastore:"node_id"`
}

type AppAuthenticationStorage struct {

Active bool `json:"active" datastore:"active"`
Label string `json:"label" datastore:"label"`
Id string `json:"id" datastore:"id"`
Expand All @@ -1448,6 +1461,8 @@ type AppAuthenticationStorage struct {

Environment string `json:"environment" datastore:"environment"` // In case an auth should ALWAYS be mapped to an environment. Can help out with Oauth2 refresh (e.g. running partially on cloud and partially onprem), as well as for KMS. For now ONLY KMS has a frontend.
SuborgDistributed bool `json:"suborg_distributed" datastore:"suborg_distributed"` // Decides if it's distributed to suborgs or not

Validation AuthValidation `json:"validation" datastore:"validation"`
}

type PasswordChange struct {
Expand Down

0 comments on commit 9961fe4

Please sign in to comment.