Skip to content

Commit

Permalink
More health improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
frikky committed Oct 19, 2023
1 parent 058c66d commit e20e3ee
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 45 deletions.
65 changes: 23 additions & 42 deletions health.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type executionResult struct {
ID string `json:"id"`
}

func updateCache(workflowHealth WorkflowHealth) {
func updateOpsCache(workflowHealth WorkflowHealth) {
cacheKey := fmt.Sprintf("ops-health-check")
ctx := context.Background()

Expand Down Expand Up @@ -490,7 +490,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
return
}

log.Printf("[DEBUG] Force is true. Running health check")
//log.Printf("[DEBUG] Running health check")
userInfo, err := HandleApiAuthentication(resp, request)
if err != nil {
log.Printf("[WARNING] Api authentication failed in handleInfo: %s", err)
Expand All @@ -509,30 +509,18 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
return
}

log.Printf("[DEBUG] Does user who is running health check have support access? %t", userInfo.SupportAccess)
log.Printf("[DEBUG] Is user api key same as ops dashboard api key? %t", userInfo.ApiKey == os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY"))
//log.Printf("[DEBUG] Does user who is running health check have support access? %t", userInfo.SupportAccess)
//log.Printf("[DEBUG] Is user api key same as ops dashboard api key? %t", userInfo.ApiKey == os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY"))

// Use channel for getting RunOpsWorkflow function results
workflowHealthChannel := make(chan WorkflowHealth)
// appHealthChannel := make(chan AppHealth)
go func() {
log.Printf("[DEBUG] Running workflowHealthChannel goroutine")
workflowHealth, err := RunOpsWorkflow(apiKey, orgId)
if err != nil {
log.Printf("[ERROR] Failed workflow health check: %s", err)
}

if workflowHealth.Create == true {
log.Printf("[DEBUG] Deleting created ops workflow")
err = deleteOpsWorkflow(workflowHealth, apiKey)
if err != nil {
log.Printf("[ERROR] Failed deleting workflow: %s", err)
} else {
log.Printf("[DEBUG] Deleted ops workflow successfully!")
workflowHealth.Delete = true
updateCache(workflowHealth)
}
}
workflowHealthChannel <- workflowHealth
}()

Expand Down Expand Up @@ -832,24 +820,19 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {
workflowPtr, err := GetWorkflow(ctx, opsWorkflowID)
if err != nil {
log.Printf("[ERROR] Failed getting Health check workflow: %s", err)
log.Printf("[DEBUG] Creating health check workflow")
return workflowHealth, err
}

workflowHealth.Create = true
workflowHealth.WorkflowId = opsWorkflowID
updateCache(workflowHealth)
updateOpsCache(workflowHealth)

workflow := *workflowPtr

log.Printf("[DEBUG] Running health check workflow. workflowHealth till now: %#v", workflowHealth)

// 2. Run workflow
id := workflow.ID
_ = id
_ = orgId

// prepare the request
url := baseUrl + "/api/v1/workflows/" + id + "/execute"
log.Printf("[DEBUG] Running health check workflow with URL: %s", url)
req, err := http.NewRequest("POST", url, nil)
Expand All @@ -862,19 +845,6 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+apiKey)

// startId := "98713d6a-dd6b-4bd6-a11c-9778b80f2a28"
// body := map[string]string{"execution_argument": "", "start": startId}

// convert the body to JSON
// bodyJson, err := json.Marshal(body)
// if err != nil {
// log.Printf("[ERROR] Failed marshalling body: %s", err)
// return workflowHealth, err
// }

// set the body
// req.Body = ioutil.NopCloser(bytes.NewBuffer(bodyJson))

// send the request
client := &http.Client{}
resp, err := client.Do(req)
Expand Down Expand Up @@ -920,10 +890,21 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {
workflowHealth.Run = true
workflowHealth.ExecutionId = execution.ExecutionId

updateCache(workflowHealth)

updateOpsCache(workflowHealth)
timeout := time.After(5 * time.Minute)

if workflowHealth.Create == true {
log.Printf("[DEBUG] Deleting created ops workflow")
err = deleteOpsWorkflow(workflowHealth, apiKey)
if err != nil {
log.Printf("[ERROR] Failed deleting workflow: %s", err)
} else {
log.Printf("[DEBUG] Deleted ops workflow successfully!")
workflowHealth.Delete = true
updateOpsCache(workflowHealth)
}
}

// 3. Check if workflow ran successfully
// ping /api/v1/streams/results/<execution_id> while workflowHealth.RunFinished is false
// if workflowHealth.RunFinished is true, return workflowHealth
Expand Down Expand Up @@ -981,7 +962,7 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {
workflowHealth.RunStatus = executionResults.Status
}

updateCache(workflowHealth)
updateOpsCache(workflowHealth)

log.Printf("[DEBUG] Workflow Health execution Result Status: %#v for executionID: %s", executionResults.Status, workflowHealth.ExecutionId)

Expand Down Expand Up @@ -1041,11 +1022,9 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {
return "", errors.New("Ops dashboard user not admin")
}

// make a GET request to https://shuffler.io/api/v1/workflows/602c7cf5-500e-4bd1-8a97-aa5bc8a554e6
// to get the workflow
url := "https://shuffler.io/api/v1/workflows/602c7cf5-500e-4bd1-8a97-aa5bc8a554e6"

// Should be local
// Create a new HTTP GET request
url := "https://shuffler.io/api/v1/workflows/602c7cf5-500e-4bd1-8a97-aa5bc8a554e6"
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Println("[ERROR] creating HTTP request:", err)
Expand Down Expand Up @@ -1213,6 +1192,8 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {
workflowData.OrgId = tmpworkflow.OrgId
workflowData.Owner = tmpworkflow.Owner
workflowData.ExecutingOrg = tmpworkflow.ExecutingOrg
workflowData.Hidden = true
workflowData.Public = false

// Save the workflow: PUT http://localhost:5002/api/v1/workflows/{id}?skip_save=true

Expand Down
9 changes: 6 additions & 3 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -5685,7 +5685,7 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {


// Some internal reserves
if (strings.ToLower(action.Name) == "send_sms_shuffle" || strings.ToLower(action.Name) == "send_email_shuffle") && param.Name == "apikey" {
if ((strings.ToLower(action.Name) == "send_sms_shuffle" || strings.ToLower(action.Name) == "send_email_shuffle") && param.Name == "apikey") || (action.Name == "repeat_back_to_me") {
} else {
thisError := fmt.Sprintf("Action %s is missing required parameter %s", action.Label, param.Name)
if actionParam.Configuration && len(action.AuthenticationId) == 0 {
Expand Down Expand Up @@ -15080,8 +15080,11 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
var execution ExecutionRequest
err = json.Unmarshal(body, &execution)
if err != nil {
//log.Printf("[WARNING] Failed execution POST unmarshaling - continuing anyway: %s", err)
//return WorkflowExecution{}, "", err
if len(string(body)) < 100 {
log.Printf("[WARNING] Failed execution POST unmarshaling - continuing anyway: '%s'. Err: %s", string(body), err)
} else {
log.Printf("[WARNING] Failed execution POST unmarshaling - continuing anyway: %s", err)
}
}

// Ensuring it works even if startpoint isn't defined
Expand Down

0 comments on commit e20e3ee

Please sign in to comment.