Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
frikky committed Oct 15, 2023
2 parents 47dbc97 + 22e69c4 commit 196e7dc
Showing 1 changed file with 119 additions and 21 deletions.
140 changes: 119 additions & 21 deletions health.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,18 +459,25 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {

userInfo, err := HandleApiAuthentication(resp, request)
if err != nil {
log.Printf("[WARNING] Api authentication failed in handleInfo: %s. Continuing anyways here..", err)
log.Printf("[WARNING] Api authentication failed in handleInfo: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "Api authentication failed!"}`))
return
}

if project.Environment == "onprem" && userInfo.Role != "admin" {
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "Only admins can run health check!"}`))
return
} else if project.Environment == "Cloud" && !(userInfo.ApiKey == os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY") || userInfo.SupportAccess) {
resp.WriteHeader(401)
} else if project.Environment == "Cloud" && (userInfo.ApiKey != os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY") || userInfo.SupportAccess) {
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "Only admins can run health check!"}`))
return
}

log.Printf("[DEBUG] does user who is running health check have support access? %t", userInfo.SupportAccess)
log.Printf("[DEBUG] Is user api key same as ops dashboard api key? %t", userInfo.ApiKey == os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY"))

} else if force != "true" {
// get last health check from database
healths, err := GetPlatformHealth(ctx, 0, 0, 1)
Expand Down Expand Up @@ -589,23 +596,31 @@ func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) {
// convert all to int64
limitInt, err := strconv.Atoi(limit)
if err != nil {
log.Printf("[ERROR] Failed converting limit to int64: %s", err)
log.Printf("[ERROR] Failed converting limit to int: %s", err)
limitInt = 0
}

beforeInt, err := strconv.Atoi(before)
if err != nil {
log.Printf("[ERROR] Failed converting before to int64: %s", err)
log.Printf("[ERROR] Failed converting before to int: %s", err)
beforeInt = 0
}

afterInt, err := strconv.Atoi(after)
if err != nil {
log.Printf("[ERROR] Failed converting after to int64: %s", err)
log.Printf("[ERROR] Failed converting after to int: %s", err)
afterInt = 0
}

healthChecks, err := GetPlatformHealth(ctx, afterInt, beforeInt, limitInt)

if strings.Contains(err.Error(), "Bad statuscode: 404") && project.Environment == "onprem" {
log.Printf("[WARNING] Failed getting platform health from database: %s. Probably because no workflowexecutions have been done",err)
resp.WriteHeader(200)
resp.Write([]byte(`[]`))
return
}

if err != nil {
log.Printf("[ERROR] Failed getting platform health from database: %s", err)
resp.WriteHeader(500)
Expand Down Expand Up @@ -683,6 +698,93 @@ func deleteOpsWorkflow(workflowHealth WorkflowHealth, apiKey string) error {
return nil
}

func fixOpensearch() error {
// Define the index mapping
mapping := `{
"properties": {
"workflow": {
"properties": {
"actions": {
"properties": {
"parameters": {
"properties": {
"value": {
"type": "text"
},
"example": {
"type": "text"
}
}
}
}
}
}
}
}
}`

// Get the username and password from environment variables
username := os.Getenv("SHUFFLE_OPENSEARCH_USERNAME")
if len(username) == 0 {
log.Printf("[DEBUG] Opensearch username not set. Setting to default")
username = "admin"
}

password := os.Getenv("SHUFFLE_OPENSEARCH_PASSWORD")
if len(password) == 0 {
log.Printf("[DEBUG] Opensearch password not set. Setting to default")
password = "admin"
}

opensearchUrl := os.Getenv("SHUFFLE_OPENSEARCH_URL")
if len(opensearchUrl) == 0 {
log.Printf("[DEBUG] Opensearch url not set. Setting to default")
opensearchUrl = "http://localhost:9200"
}

apiUrl := opensearchUrl + "/workflowexecution/_mapping"

log.Printf("[DEBUG] apiurl for fixing opensearch: %s", apiUrl)

// Create a new request
req, err := http.NewRequest("PUT", apiUrl, bytes.NewBufferString(mapping))
if err != nil {
log.Fatalf("Error creating the request: %s", err)
}

// Set the request headers
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(username, password)

// Create a new HTTP client
client := &http.Client{}

// Send the request in a loop until a 200 status code is received
res, err := client.Do(req)
if err != nil {
log.Printf("Error sending the request while fixing execution body: %s", err)
return err
}

// Read the response body
body, err := ioutil.ReadAll(res.Body)
if err != nil {
log.Printf("Error reading the response body while fixing execution body: %s", err)
return err
}
res.Body.Close()

if res.StatusCode == 200 {
log.Printf("Index created successfully: %s. Opensearch mappings should be fixed.", body)
return nil
} else {
log.Printf("Failed to create index, retrying: %s", body)
return errors.New("Failed index mapping")
}

return nil
}

func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {
// run workflow with id 602c7cf5-500e-4bd1-8a97-aa5bc8a554e6
ctx := context.Background()
Expand Down Expand Up @@ -733,7 +835,7 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {

workflow := *workflowPtr

log.Printf("[DEBUG] Running health check workflow")
log.Printf("[DEBUG] Running health check workflow. workflowHealth till now: %#v", workflowHealth)

// 2. Run workflow
id := workflow.ID
Expand Down Expand Up @@ -777,17 +879,23 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {
defer resp.Body.Close()

if resp.StatusCode != 200 {
log.Printf("[ERROR] Failed running health check workflow: %s. The status code is: %d", err, resp.StatusCode)
log.Printf("[ERROR] Failed running health check workflow: %s. The status code is: %d", id, resp.StatusCode)
// print the response body
respBodyErr, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[ERROR] Failed reading health check HTTP response body: %s", err)
} else {
log.Printf("[ERROR] Health check running Workflow Response: %s", respBodyErr)
}

log.Printf("[DEBUG] Setting workflowHealth.Create = false")
workflowHealth.Create = false
if project.Environment == "onprem" {
log.Printf("Trying to fix opensearch mappings")
err = fixOpensearch()
if err != nil {
log.Printf("[ERROR] Failed fixing opensearch mappings: %s", err)
} else {
log.Printf("[DEBUG] Fixed opensearch mappings successfully! Maybe try ops dashboard again?")
}
}

return workflowHealth, err
}
Expand Down Expand Up @@ -1005,12 +1113,6 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {
}

workflowData.Actions[actionIndex] = action
// if ArrayContains(blacklisted, action.Label) {
// // dates keep failing in opensearch
// // this is a grander issue, but for now, we'll just skip these actions
// log.Printf("[WARNING] Skipping action %s", action.Label)
// continue
// }

actions = append(actions, action)
}
Expand Down Expand Up @@ -1089,8 +1191,6 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {
return "", errors.New("Error reading HTTP response response body: " + err.Error())
}

log.Printf("[DEBUG] body is: %s", body)

var tmpworkflow Workflow

// Unmarshal the JSON data into a Workflow instance
Expand Down Expand Up @@ -1126,8 +1226,6 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {
return "", err
}

log.Printf("[DEBUG] Sending workflow JSON data: %s", workflowDataJSON)

// set the body
req.Body = ioutil.NopCloser(bytes.NewBuffer(workflowDataJSON))

Expand Down

0 comments on commit 196e7dc

Please sign in to comment.