From 96bf86986649733675cac0769417f3702b6b54b6 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Sun, 3 Sep 2023 20:44:11 +0530 Subject: [PATCH 01/15] testing --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 35e601f..1d89e3e 100755 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/shuffle/shuffle-shared +module github.com/0x0elliot/shuffle-shared // Keep on 1.11 until AppEngine supports 1.17 or higher From 6f653abdfcc1eb995d4b1ae30e0a4b6c852bc789 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Tue, 10 Oct 2023 19:42:45 +0530 Subject: [PATCH 02/15] fix: first draft --- db-connector.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++++ health.go | 56 +++++++++++++++++++++++++------- structs.go | 25 +++++++++++++++ 3 files changed, 155 insertions(+), 11 deletions(-) diff --git a/db-connector.go b/db-connector.go index 7b14be5..66d82c3 100755 --- a/db-connector.go +++ b/db-connector.go @@ -5341,6 +5341,91 @@ func SetNewValue(ctx context.Context, newvalue NewValue) error { return nil } +func GetLatestPlatformHealth(ctx context.Context) (HealthCheckDB, error) { + nameKey := "platform_health" + + // sort by "updated", and get the first one + health := HealthCheckDB{} + if project.DbType == "opensearch" { + var buf bytes.Buffer + query := map[string]interface{}{ + "from": 0, + "size": 1, + "sort": map[string]interface{}{ + "updated": map[string]interface{}{ + "order": "desc", + }, + }, + } + + if err := json.NewEncoder(&buf).Encode(query); err != nil { + log.Printf("[WARNING] Error encoding find user query: %s", err) + return health, err + } + + res, err := project.Es.Search( + project.Es.Search.WithContext(ctx), + project.Es.Search.WithIndex(strings.ToLower(GetESIndexPrefix(nameKey))), + project.Es.Search.WithBody(&buf), + project.Es.Search.WithTrackTotalHits(true), + ) + if err != nil { + log.Printf("[ERROR] Error getting response from Opensearch (get latest platform health): %s", err) + return health, err + } + defer res.Body.Close() + + if res.StatusCode != 200 && res.StatusCode != 201 { + return health, errors.New(fmt.Sprintf("Bad statuscode: %d", res.StatusCode)) + } + + if res.IsError() { + var e map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&e); err != nil { + log.Printf("[WARNING] Error parsing the response body: %s", err) + return health, err + } else { + // Print the response status and error information. + log.Printf("[%s] %s: %s", + res.Status(), + e["error"].(map[string]interface{})["type"], + e["error"].(map[string]interface{})["reason"], + ) + } + } + + respBody, err := ioutil.ReadAll(res.Body) + if err != nil { + return health, err + } + + wrapped := HealthCheckSearchWrapper{} + err = json.Unmarshal(respBody, &wrapped) + if err != nil { + return health, err + } + + if len(wrapped.Hits.Hits) == 0 { + return health, errors.New("No healthchecks found") + } + + health = wrapped.Hits.Hits[0].Source + } else { + q := datastore.NewQuery(nameKey).Order("-updated").Limit(1) + _, err := project.Dbclient.GetAll(ctx, q, &health) + if err != nil { + log.Printf("[WARNING] Error getting latest platform health: %s", err) + return health, err + } + + if len(health) == 0 { + return health, errors.New("No healthchecks found") + } + } + + return health, nil +} + func SetPlatformHealth(ctx context.Context, health HealthCheckDB) error { nameKey := "platform_health" diff --git a/health.go b/health.go index 373c4ea..2d8da24 100644 --- a/health.go +++ b/health.go @@ -393,12 +393,13 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { if cors { return } - // Check cache if health check was run in last 5 minutes - // If yes, return cached result, else run health check + + // check if there is a force parameter + force := request.URL.Query().Get("force") + ctx := GetContext(request) platformHealth := HealthCheck{} cacheKey := fmt.Sprintf("ops-health-check") - cacheHit := false memcacheUrl := os.Getenv("SHUFFLE_MEMCACHED") @@ -419,16 +420,8 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { //log.Printf("CACHEDATA: %s", cacheData) err = json.Unmarshal(cacheData, &platformHealth) if err == nil { - // FIXME: Check if last updated is less than 5 minutes with platformHealth.Edited in unix time - // If yes, return cached result, else run health check log.Printf("Platform health returned: %#v", platformHealth) marshalledData, err := json.Marshal(platformHealth) - cacheHit = true - - err_ := SetOpsDashboardCacheHitStat(ctx, cacheHit) - if err_ != nil { - log.Printf("[WARNING] Failed setting cache hit stat: %s", err_) - } if err == nil { resp.WriteHeader(200) @@ -448,6 +441,47 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { return } + if force == "true" && project.Environment == "onprem" { + log.Printf("[DEBUG] Force is true. Running health check") + + userInfo, err := shuffle.HandleApiAuthentication(resp, request) + if err != nil { + log.Printf("[WARNING] Api authentication failed in handleInfo: %s", err) + + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false}`)) + return + } + + if userInfo.Role != "admin" { + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false, "reason": "Only admins can run health check!"}`)) + return + } + } else if force != "true" { + // get last health check from database + health, err := GetPlatformHealth(ctx) + if err == nil { + log.Printf("[DEBUG] Last health check was: %#v", health) + platformData, err := json.Marshal(health) + if err != nil { + log.Printf("[ERROR] Failed marshalling platform health data: %s", err) + resp.WriteHeader(500) + resp.Write([]byte(`{"success": false, "reason": "Failed JSON parsing platform health."}`)) + return + } + + resp.WriteHeader(200) + resp.Write(platformData) + return + } else { + log.Printf("[WARNING] Failed getting platform health from database: %s", err) + resp.WriteHeader(500) + resp.Write([]byte(`{"success": false, "reason": "Failed getting platform health from database."}`)) + return + } + } + // Use channel for getting RunOpsWorkflow function results workflowHealthChannel := make(chan WorkflowHealth) // appHealthChannel := make(chan AppHealth) diff --git a/structs.go b/structs.go index 778c8e9..54616c8 100755 --- a/structs.go +++ b/structs.go @@ -1576,6 +1576,31 @@ type StatisticsItem struct { OrgId string `json:"org_id" datastore:"org_id"` } +type HealthCheckSearchWrapper struct { + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Shards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` + } `json:"_shards"` + Hits struct { + Total struct { + Value int `json:"value"` + Relation string `json:"relation"` + } `json:"total"` + MaxScore float64 `json:"max_score"` + Hits []struct { + Index string `json:"_index"` + Type string `json:"_type"` + ID string `json:"_id"` + Score float64 `json:"_score"` + Source HealthCheck `json:"_source"` + } `json:"hits"` + } `json:"hits"` +} + type NewValueSearchWrapper struct { Took int `json:"took"` TimedOut bool `json:"timed_out"` From 152f1e800a59902cccb8561ff529303d3e02cc8e Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Wed, 11 Oct 2023 05:42:37 +0530 Subject: [PATCH 03/15] fix: adding GetFirstOrg function --- db-connector.go | 55 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/db-connector.go b/db-connector.go index 66d82c3..6f0aeff 100755 --- a/db-connector.go +++ b/db-connector.go @@ -2849,6 +2849,61 @@ func GetOrg(ctx context.Context, id string) (*Org, error) { return curOrg, nil } +func GetFirstOrg(ctx context.Context) (*Org, error) { + nameKey := "Organizations" + + curOrg := &Org{} + if project.DbType == "opensearch" { + res, err := project.Es.Search( + project.Es.Search.WithContext(ctx), + project.Es.Search.WithIndex(strings.ToLower(GetESIndexPrefix(nameKey))), + project.Es.Search.WithTrackTotalHits(true), + ) + if err != nil { + log.Printf("[ERROR] Error getting response from Opensearch (get first org): %s", err) + return curOrg, err + } + + defer res.Body.Close() + if res.StatusCode != 200 && res.StatusCode != 201 { + return curOrg, errors.New(fmt.Sprintf("Bad statuscode: %d", res.StatusCode)) + } + + respBody, err := ioutil.ReadAll(res.Body) + if err != nil { + return curOrg, err + } + + wrapped := OrgSearchWrapper{} + err = json.Unmarshal(respBody, &wrapped) + if err != nil { + return curOrg, err + } + + if len(wrapped.Hits.Hits) > 0 { + curOrg = &wrapped.Hits.Hits[0].Source + } else { + return curOrg, errors.New("No orgs found") + } + + } else { + query := datastore.NewQuery(nameKey).Limit(1) + allOrgs := []Org{} + _, err := project.Dbclient.GetAll(ctx, query, &allOrgs) + if err != nil { + return curOrg, err + } + + if len(allOrgs) > 0 { + curOrg = &allOrgs[0] + } else { + return curOrg, errors.New("No orgs found") + } + } + + return curOrg, nil +} + func indexEs(ctx context.Context, nameKey, id string, bytes []byte) error { //req := esapi.IndexRequest{ req := opensearchapi.IndexRequest{ From 1cb31e4ddc07acb26033131f2ce40ab184f05e62 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Wed, 11 Oct 2023 05:45:49 +0530 Subject: [PATCH 04/15] fix: fixing search function bugs --- structs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/structs.go b/structs.go index 54616c8..1590884 100755 --- a/structs.go +++ b/structs.go @@ -1596,7 +1596,7 @@ type HealthCheckSearchWrapper struct { Type string `json:"_type"` ID string `json:"_id"` Score float64 `json:"_score"` - Source HealthCheck `json:"_source"` + Source HealthCheckDB `json:"_source"` } `json:"hits"` } `json:"hits"` } From bd7c5b69f934ed46ac5573f093630c8aceca8bcb Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Wed, 11 Oct 2023 05:47:13 +0530 Subject: [PATCH 05/15] fix: fixing search function bugs --- db-connector.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/db-connector.go b/db-connector.go index 6f0aeff..7b9fb01 100755 --- a/db-connector.go +++ b/db-connector.go @@ -5472,10 +5472,6 @@ func GetLatestPlatformHealth(ctx context.Context) (HealthCheckDB, error) { log.Printf("[WARNING] Error getting latest platform health: %s", err) return health, err } - - if len(health) == 0 { - return health, errors.New("No healthchecks found") - } } return health, nil From e52ec61b1ca7862436d5382ae919020580a3ea67 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Wed, 11 Oct 2023 06:01:51 +0530 Subject: [PATCH 06/15] fix: making ops dashboard automatically read the first user and then run without cache --- db-connector.go | 22 ++++++++++++++++------ health.go | 30 +++++++++++++++++++++++------- 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/db-connector.go b/db-connector.go index 7b9fb01..a6f7c3b 100755 --- a/db-connector.go +++ b/db-connector.go @@ -5396,16 +5396,14 @@ func SetNewValue(ctx context.Context, newvalue NewValue) error { return nil } -func GetLatestPlatformHealth(ctx context.Context) (HealthCheckDB, error) { +func GetPlatformHealth(ctx context.Context, getLatest bool) ([]HealthCheckDB, error) { nameKey := "platform_health" // sort by "updated", and get the first one - health := HealthCheckDB{} + health := []HealthCheckDB{} if project.DbType == "opensearch" { var buf bytes.Buffer query := map[string]interface{}{ - "from": 0, - "size": 1, "sort": map[string]interface{}{ "updated": map[string]interface{}{ "order": "desc", @@ -5413,6 +5411,10 @@ func GetLatestPlatformHealth(ctx context.Context) (HealthCheckDB, error) { }, } + if getLatest { + query["size"] = 1 + } + if err := json.NewEncoder(&buf).Encode(query); err != nil { log.Printf("[WARNING] Error encoding find user query: %s", err) return health, err @@ -5464,9 +5466,17 @@ func GetLatestPlatformHealth(ctx context.Context) (HealthCheckDB, error) { return health, errors.New("No healthchecks found") } - health = wrapped.Hits.Hits[0].Source + for _, hit := range wrapped.Hits.Hits { + health = append(health, hit.Source) + } + } else { - q := datastore.NewQuery(nameKey).Order("-updated").Limit(1) + q := datastore.NewQuery(nameKey).Order("-updated") + + if getLatest { + q = q.Limit(1) + } + _, err := project.Dbclient.GetAll(ctx, q, &health) if err != nil { log.Printf("[WARNING] Error getting latest platform health: %s", err) diff --git a/health.go b/health.go index 2d8da24..72f7e23 100644 --- a/health.go +++ b/health.go @@ -401,11 +401,23 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { platformHealth := HealthCheck{} cacheKey := fmt.Sprintf("ops-health-check") - memcacheUrl := os.Getenv("SHUFFLE_MEMCACHED") - apiKey := os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY") orgId := os.Getenv("SHUFFLE_OPS_DASHBOARD_ORG") + if project.Environment == "onprem" && (len(apiKey) == 0 || len(orgId) == 0) { + log.Printf("[DEBUG] Ops dashboard api key or org not set. Getting first org and user") + org, err := GetFirstOrg(ctx) + if err != nil { + log.Printf("[ERROR] Failed getting first org: %s", err) + resp.WriteHeader(500) + resp.Write([]byte(`{"success": false, "reason": "Set up a user and org first!")}`)) + return + } + + orgId = org.Id + apiKey = org.Users[0].ApiKey + } + if len(apiKey) == 0 || len(orgId) == 0 { log.Printf("[WARNING] Ops dashboard api key or org not set. Not setting up ops workflow") resp.WriteHeader(500) @@ -413,7 +425,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { return } - if len(memcacheUrl) != 0 { + if project.CacheDb { cache, err := GetCache(ctx, cacheKey) if err == nil { cacheData := []byte(cache.([]uint8)) @@ -435,16 +447,16 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { log.Printf("[WARNING] Failed getting cache ops health on first try: %s", err) } } else { - log.Println("[WARNING] Memcache URL not set! Exiting..") + log.Println("[WARNING] Cache not enabled. Not using cache for ops health isn't recommended!") resp.WriteHeader(500) - resp.Write([]byte(`{"success": false, "reason": "SHUFFLE_MEMCACHED not set. Please set memcached to use this feature!"}`)) + resp.Write([]byte(`{"success": false, "reason": "Cache not enabled. Not using cache for ops health isn't recommended!"}`)) return } if force == "true" && project.Environment == "onprem" { log.Printf("[DEBUG] Force is true. Running health check") - userInfo, err := shuffle.HandleApiAuthentication(resp, request) + userInfo, err := HandleApiAuthentication(resp, request) if err != nil { log.Printf("[WARNING] Api authentication failed in handleInfo: %s", err) @@ -460,7 +472,10 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { } } else if force != "true" { // get last health check from database - health, err := GetPlatformHealth(ctx) + healths, err := GetPlatformHealth(ctx, true) + + health := healths[0] + if err == nil { log.Printf("[DEBUG] Last health check was: %#v", health) platformData, err := json.Marshal(health) @@ -864,6 +879,7 @@ func InitOpsWorkflow() (string, error) { workflowData.Public = false workflowData.Status = "" workflowData.Name = "Ops Dashboard Workflow" + workflowData.Hidden = true var actions []Action // var blacklisted = []string{"Date_to_epoch", "input_data", "Compare_timestamps", "Get_current_timestamp"} From 31bd58974d6d29243435e42009b48e00ef01d2ac Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Wed, 11 Oct 2023 07:10:29 +0530 Subject: [PATCH 07/15] fix: fixing bugs and making force work --- health.go | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/health.go b/health.go index 72f7e23..2b91c2d 100644 --- a/health.go +++ b/health.go @@ -61,7 +61,7 @@ func base64StringToString(base64String string) (string, error) { return string(decoded), nil } -func RunOpsAppHealthCheck() (AppHealth, error) { +func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { log.Printf("[DEBUG] Running app health check") appHealth := AppHealth{ Create: false, @@ -145,7 +145,7 @@ func RunOpsAppHealthCheck() (AppHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")) + req.Header.Set("Authorization", "Bearer " + apiKey) // send the request client = &http.Client{} @@ -414,6 +414,8 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { return } + log.Printf("[DEBUG] Setting api key to that of user %s and org id to %s ", org.Users[0].ApiKey, org.Id) + orgId = org.Id apiKey = org.Users[0].ApiKey } @@ -425,7 +427,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { return } - if project.CacheDb { + if project.CacheDb && force != "true" { cache, err := GetCache(ctx, cacheKey) if err == nil { cacheData := []byte(cache.([]uint8)) @@ -446,7 +448,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { } else { log.Printf("[WARNING] Failed getting cache ops health on first try: %s", err) } - } else { + } else if !(project.CacheDb) { log.Println("[WARNING] Cache not enabled. Not using cache for ops health isn't recommended!") resp.WriteHeader(500) resp.Write([]byte(`{"success": false, "reason": "Cache not enabled. Not using cache for ops health isn't recommended!"}`)) @@ -501,7 +503,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { workflowHealthChannel := make(chan WorkflowHealth) // appHealthChannel := make(chan AppHealth) go func() { - workflowHealth, err := RunOpsWorkflow() + workflowHealth, err := RunOpsWorkflow(apiKey, orgId) if err != nil { log.Printf("[ERROR] Failed running workflow health check: %s", err) workflowHealthChannel <- workflowHealth @@ -579,7 +581,7 @@ func OpsDashboardCacheHitStat(resp http.ResponseWriter, request *http.Request) { resp.Write(statsBody) } -func RunOpsWorkflow() (WorkflowHealth, error) { +func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { // run workflow with id 602c7cf5-500e-4bd1-8a97-aa5bc8a554e6 ctx := context.Background() @@ -593,7 +595,7 @@ func RunOpsWorkflow() (WorkflowHealth, error) { } // 1. Get workflow - opsWorkflowID, err := InitOpsWorkflow() + opsWorkflowID, err := InitOpsWorkflow(apiKey, orgId) if err != nil { log.Printf("[ERROR] Failed creating Health check workflow: %s", err) return workflowHealth, err @@ -615,7 +617,6 @@ func RunOpsWorkflow() (WorkflowHealth, error) { // 2. Run workflow id := workflow.ID - orgId := os.Getenv("SHUFFLE_OPS_DASHBOARD_ORG") _ = id _ = orgId @@ -639,7 +640,7 @@ func RunOpsWorkflow() (WorkflowHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")) + req.Header.Set("Authorization", "Bearer "+ apiKey) // startId := "98713d6a-dd6b-4bd6-a11c-9778b80f2a28" // body := map[string]string{"execution_argument": "", "start": startId} @@ -705,7 +706,7 @@ func RunOpsWorkflow() (WorkflowHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")) + req.Header.Set("Authorization", "Bearer " + apiKey) // convert the body to JSON reqBody := map[string]string{"execution_id": execution.ExecutionId, "authorization": os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")} @@ -769,7 +770,7 @@ func RunOpsWorkflow() (WorkflowHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")) + req.Header.Set("Authorization", "Bearer "+apiKey) // send the request client = &http.Client{} @@ -791,15 +792,17 @@ func RunOpsWorkflow() (WorkflowHealth, error) { return workflowHealth, nil } -func InitOpsWorkflow() (string, error) { - opsDashboardApikey := os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY") +func InitOpsWorkflow(apiKey string, OrgId string) (string, error) { + opsDashboardApikey := apiKey + opsDashboardOrgId := OrgId + if len(opsDashboardApikey) == 0 { log.Printf("[WARNING] Ops dashboard api key not set. Not setting up ops workflow") return "", errors.New("Ops dashboard api key not set") } - opsDashboardOrgId := os.Getenv("SHUFFLE_OPS_DASHBOARD_ORG") + if len(opsDashboardOrgId) == 0 { log.Printf("[WARNING] Ops dashboard org not set. Not setting up ops workflow") return "", errors.New("Ops dashboard org not set") From 7ad0936e0fefab87ad2adaa45a4582d3c65d4851 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Wed, 11 Oct 2023 07:30:05 +0530 Subject: [PATCH 08/15] fix: making created workflows get deleted, if execution becomes an issue --- health.go | 107 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 64 insertions(+), 43 deletions(-) diff --git a/health.go b/health.go index 2b91c2d..c4afb9c 100644 --- a/health.go +++ b/health.go @@ -506,8 +506,14 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { workflowHealth, err := RunOpsWorkflow(apiKey, orgId) if err != nil { log.Printf("[ERROR] Failed running workflow health check: %s", err) - workflowHealthChannel <- workflowHealth - return + + if workflowHealth.Create == true { + log.Printf("[DEBUG] Deleting created ops workflow") + workflowHealth, err = deleteWorkflow(workflowHealth, apiKey) + if err != nil { + log.Printf("[ERROR] Failed deleting workflow: %s", err) + } + } } workflowHealthChannel <- workflowHealth }() @@ -581,6 +587,53 @@ func OpsDashboardCacheHitStat(resp http.ResponseWriter, request *http.Request) { resp.Write(statsBody) } +func deleteWorkflow(workflowHealth WorkflowHealth , apiKey string) (WorkflowHealth, error) { + baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL") + if len(baseUrl) == 0 { + baseUrl = "https://shuffler.io" + } + + if project.Environment == "onprem" { + baseUrl = "http://localhost:5001" + } + + id := workflowHealth.ExecutionId + + // 4. Delete workflow + // make a DELETE request to https://shuffler.io/api/v1/workflows/ + url := baseUrl + "/api/v1/workflows/" + id + log.Printf("[DEBUG] Deleting workflow with id: %s", id) + + req, err := http.NewRequest("DELETE", url, nil) + if err != nil { + log.Printf("[ERROR] Failed creating HTTP request: %s", err) + return workflowHealth, err + } + + // set the headers + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+apiKey) + + // send the request + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + log.Printf("[ERROR] Failed deleting the health check workflow with HTTP request: %s", err) + return workflowHealth, err + } + + if resp.StatusCode != 200 { + log.Printf("[ERROR] Failed deleting the health check workflow: %s. The status code was: %d", err, resp.StatusCode) + return workflowHealth, err + } + + defer resp.Body.Close() + + workflowHealth.Delete = true + + return workflowHealth, nil +} + func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { // run workflow with id 602c7cf5-500e-4bd1-8a97-aa5bc8a554e6 ctx := context.Background() @@ -594,6 +647,15 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { ExecutionId: "", } + baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL") + if len(baseUrl) == 0 { + baseUrl = "https://shuffler.io" + } + + if project.Environment == "onprem" { + baseUrl = "http://localhost:5001" + } + // 1. Get workflow opsWorkflowID, err := InitOpsWorkflow(apiKey, orgId) if err != nil { @@ -620,15 +682,6 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { _ = id _ = orgId - baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL") - if len(baseUrl) == 0 { - baseUrl = "https://shuffler.io" - } - - if project.Environment == "onprem" { - baseUrl = "http://localhost:5001" - } - // prepare the request url := baseUrl + "/api/v1/workflows/" + id + "/execute" log.Printf("[DEBUG] Running health check workflow with URL: %s", url) @@ -757,38 +810,6 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { time.Sleep(2 * time.Second) } - // 4. Delete workflow - // make a DELETE request to https://shuffler.io/api/v1/workflows/ - url = baseUrl + "/api/v1/workflows/" + id - log.Printf("[DEBUG] Deleting workflow with id: %s", id) - - req, err = http.NewRequest("DELETE", url, nil) - if err != nil { - log.Printf("[ERROR] Failed creating HTTP request: %s", err) - return workflowHealth, err - } - - // set the headers - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+apiKey) - - // send the request - client = &http.Client{} - resp, err = client.Do(req) - if err != nil { - log.Printf("[ERROR] Failed deleting the health check workflow with HTTP request: %s", err) - return workflowHealth, err - } - - if resp.StatusCode != 200 { - log.Printf("[ERROR] Failed deleting the health check workflow: %s. The status code was: %d", err, resp.StatusCode) - return workflowHealth, err - } - - defer resp.Body.Close() - - workflowHealth.Delete = true - return workflowHealth, nil } From 05e5d385ef51cfcad03fc459016bc69a0c481aea Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Wed, 11 Oct 2023 07:41:29 +0530 Subject: [PATCH 09/15] fix: fixing cache issues --- health.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/health.go b/health.go index c4afb9c..0426c5d 100644 --- a/health.go +++ b/health.go @@ -509,9 +509,13 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { if workflowHealth.Create == true { log.Printf("[DEBUG] Deleting created ops workflow") - workflowHealth, err = deleteWorkflow(workflowHealth, apiKey) + err = deleteWorkflow(workflowHealth, apiKey) if err != nil { log.Printf("[ERROR] Failed deleting workflow: %s", err) + } else { + log.Printf("[DEBUG] Deleted ops workflow successfully!") + workflowHealth.Delete = true + updateCache(workflowHealth) } } } @@ -587,7 +591,7 @@ func OpsDashboardCacheHitStat(resp http.ResponseWriter, request *http.Request) { resp.Write(statsBody) } -func deleteWorkflow(workflowHealth WorkflowHealth , apiKey string) (WorkflowHealth, error) { +func deleteWorkflow(workflowHealth WorkflowHealth , apiKey string) (error) { baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL") if len(baseUrl) == 0 { baseUrl = "https://shuffler.io" @@ -607,7 +611,7 @@ func deleteWorkflow(workflowHealth WorkflowHealth , apiKey string) (WorkflowHeal req, err := http.NewRequest("DELETE", url, nil) if err != nil { log.Printf("[ERROR] Failed creating HTTP request: %s", err) - return workflowHealth, err + return err } // set the headers @@ -619,19 +623,17 @@ func deleteWorkflow(workflowHealth WorkflowHealth , apiKey string) (WorkflowHeal resp, err := client.Do(req) if err != nil { log.Printf("[ERROR] Failed deleting the health check workflow with HTTP request: %s", err) - return workflowHealth, err + return err } if resp.StatusCode != 200 { log.Printf("[ERROR] Failed deleting the health check workflow: %s. The status code was: %d", err, resp.StatusCode) - return workflowHealth, err + return err } defer resp.Body.Close() - workflowHealth.Delete = true - - return workflowHealth, nil + return nil } func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { From 9cf3338c06a15b881967b79573b2df1c0898eb42 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Wed, 11 Oct 2023 07:48:49 +0530 Subject: [PATCH 10/15] fix: fixing permissions --- health.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/health.go b/health.go index 0426c5d..a6df967 100644 --- a/health.go +++ b/health.go @@ -455,7 +455,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { return } - if force == "true" && project.Environment == "onprem" { + if force == "true" { log.Printf("[DEBUG] Force is true. Running health check") userInfo, err := HandleApiAuthentication(resp, request) @@ -467,7 +467,11 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { return } - if userInfo.Role != "admin" { + if project.Environment == "onprem" && userInfo.Role != "admin" { + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false, "reason": "Only admins can run health check!"}`)) + return + } else if project.Environment == "Cloud" && userInfo.ApiKey != os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY") { resp.WriteHeader(401) resp.Write([]byte(`{"success": false, "reason": "Only admins can run health check!"}`)) return From 6a9e112abf077382d6f3e1059f4e907eb7d342b4 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Wed, 11 Oct 2023 07:59:17 +0530 Subject: [PATCH 11/15] fix: workflow health delete always false bug --- health.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/health.go b/health.go index a6df967..5ce4c6a 100644 --- a/health.go +++ b/health.go @@ -509,18 +509,17 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { go func() { workflowHealth, err := RunOpsWorkflow(apiKey, orgId) if err != nil { - log.Printf("[ERROR] Failed running workflow health check: %s", err) - - if workflowHealth.Create == true { - log.Printf("[DEBUG] Deleting created ops workflow") - err = deleteWorkflow(workflowHealth, apiKey) - if err != nil { - log.Printf("[ERROR] Failed deleting workflow: %s", err) - } else { - log.Printf("[DEBUG] Deleted ops workflow successfully!") - workflowHealth.Delete = true - updateCache(workflowHealth) - } + log.Printf("[ERROR] Failed workflow health check: %s", err) + } + if workflowHealth.Create == true { + log.Printf("[DEBUG] Deleting created ops workflow") + err = deleteWorkflow(workflowHealth, apiKey) + if err != nil { + log.Printf("[ERROR] Failed deleting workflow: %s", err) + } else { + log.Printf("[DEBUG] Deleted ops workflow successfully!") + workflowHealth.Delete = true + updateCache(workflowHealth) } } workflowHealthChannel <- workflowHealth From e758bc72b15f6c41f618e390a59890b958523fe3 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Wed, 11 Oct 2023 08:46:21 +0530 Subject: [PATCH 12/15] fix: removing stats --- db-connector.go | 108 ------------------------------------------------ health.go | 24 ----------- structs.go | 30 -------------- 3 files changed, 162 deletions(-) diff --git a/db-connector.go b/db-connector.go index a6f7c3b..2532a6c 100755 --- a/db-connector.go +++ b/db-connector.go @@ -3904,114 +3904,6 @@ func GetUser(ctx context.Context, username string) (*User, error) { return curUser, nil } -func GetOpsDashboardCacheHitStat(ctx context.Context, limit int) ([]OpsDashboardStats, error) { - // get last 30 runs - nameKey := "opsdashboardstats" - var stats []OpsDashboardStats - - if project.DbType == "opensearch" { - var buf bytes.Buffer - query := map[string]interface{}{ - "size": limit, - "sort": map[string]interface{}{ - "timestamp": map[string]interface{}{ - "order": "desc", - }, - }, - } - - if err := json.NewEncoder(&buf).Encode(query); err != nil { - log.Printf("[WARNING] Error encoding find app query: %s", err) - return stats, err - } - - res, err := project.Es.Search( - project.Es.Search.WithContext(ctx), - project.Es.Search.WithIndex(strings.ToLower(GetESIndexPrefix(nameKey))), - project.Es.Search.WithBody(&buf), - project.Es.Search.WithTrackTotalHits(true), - ) - if err != nil { - log.Printf("[ERROR] Error getting response from Opensearch (find app by name): %s", err) - return stats, err - } - - defer res.Body.Close() - if res.StatusCode == 404 { - return stats, nil - } - - defer res.Body.Close() - if res.IsError() { - var e map[string]interface{} - if err := json.NewDecoder(res.Body).Decode(&e); err != nil { - log.Printf("[WARNING] Error parsing the response body: %s", err) - return stats, err - } - } - - if res.StatusCode != 200 && res.StatusCode != 201 { - return stats, errors.New(fmt.Sprintf("Bad statuscode: %d", res.StatusCode)) - } - - respBody, err := ioutil.ReadAll(res.Body) - - var wrapped OpsDashboardStatSearchWrapper - - // put into struct - err = json.Unmarshal(respBody, &wrapped) - - stats = []OpsDashboardStats{} - for _, hit := range wrapped.Hits.Hits { - stats = append(stats, hit.Source) - } - - if err != nil { - log.Printf("RespBody is: %s", respBody) - return stats, err - } - - log.Printf("[INFO] Successfully got ops dashboard stats") - - } else { - q := datastore.NewQuery(nameKey).Order("-Timestamp").Limit(limit) - _, err := project.Dbclient.GetAll(ctx, q, &stats) - if err != nil { - log.Printf("[WARNING] Failed getting stats for ops dashboard: %s", err) - return stats, err - } - } - - return stats, nil -} - -func SetOpsDashboardCacheHitStat(ctx context.Context, cacheHit bool) error { - nameKey := "opsdashboardstats" - stat := OpsDashboardStats{} - stat.Timestamp = time.Now().Unix() - stat.CacheHit = cacheHit - - data, err := json.Marshal(stat) - - if project.DbType == "opensearch" { - err = indexEs(ctx, nameKey, fmt.Sprintf("%d", stat.Timestamp), data) - if err != nil { - return err - } else { - log.Printf("[INFO] Successfully updated ops dashboard stats") - } - } else { - k := datastore.NameKey(nameKey, fmt.Sprintf("%d", stat.Timestamp), nil) - if _, err := project.Dbclient.Put(ctx, k, &stat); err != nil { - log.Printf("[WARNING] Error updating ops dashboard stats: %s", err) - return err - } - } - - return nil - -} - func SetUser(ctx context.Context, user *User, updateOrg bool) error { log.Printf("[INFO] Updating a user (%s) that has the role %s with %d apps and %d orgs. Org updater: %t", user.Username, user.Role, len(user.PrivateApps), len(user.Orgs), updateOrg) parsedKey := user.Id diff --git a/health.go b/health.go index 5ce4c6a..dca3e55 100644 --- a/health.go +++ b/health.go @@ -570,30 +570,6 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { resp.Write(platformData) } -func OpsDashboardCacheHitStat(resp http.ResponseWriter, request *http.Request) { - ctx := GetContext(request) - - stats, err := GetOpsDashboardCacheHitStat(ctx, 30) - if err != nil { - log.Printf("[ERROR] Failed getting cache hit stats: %s", err) - resp.WriteHeader(500) - resp.Write([]byte(`{"success": false, "reason": "Failed getting cache hit stats. Contact support@shuffler.io"}`)) - return - } - - log.Printf("Stats are: %#v", stats) - - statsBody, err := json.Marshal(stats) - if err != nil { - log.Printf("[ERROR] Failed marshalling cache hit stats: %s", err) - resp.WriteHeader(500) - resp.Write([]byte(`{"success": false, "reason": "Failed JSON parsing cache hit stats. Contact support@shuffler.io"}`)) - } - - resp.WriteHeader(200) - resp.Write(statsBody) -} - func deleteWorkflow(workflowHealth WorkflowHealth , apiKey string) (error) { baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL") if len(baseUrl) == 0 { diff --git a/structs.go b/structs.go index 1590884..13ed3e0 100755 --- a/structs.go +++ b/structs.go @@ -1044,11 +1044,6 @@ type Comment struct { } `json:"position"` } -type OpsDashboardStats struct { - Timestamp int64 `json:"timestamp" datastore:"timestamp"` - CacheHit bool `json:"cachehit" datastore:"cachehit"` -} - type Workflow struct { Actions []Action `json:"actions" datastore:"actions,noindex"` Branches []Branch `json:"branches" datastore:"branches,noindex"` @@ -1901,31 +1896,6 @@ type ExecRequestSearchWrapper struct { } `json:"hits"` } -type OpsDashboardStatSearchWrapper struct { - Took int `json:"took"` - TimedOut bool `json:"timed_out"` - Shards struct { - Total int `json:"total"` - Successful int `json:"successful"` - Skipped int `json:"skipped"` - Failed int `json:"failed"` - } `json:"_shards"` - Hits struct { - Total struct { - Value int `json:"value"` - Relation string `json:"relation"` - } `json:"total"` - MaxScore float64 `json:"max_score"` - Hits []struct { - Index string `json:"_index"` - Type string `json:"_type"` - ID string `json:"_id"` - Score float64 `json:"_score"` - Source OpsDashboardStats `json:"_source"` - } `json:"hits"` - } `json:"hits"` -} - type AppAuthSearchWrapper struct { Took int `json:"took"` TimedOut bool `json:"timed_out"` From aef3453bedd6b900d609eb7080077757f815ca48 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Wed, 11 Oct 2023 08:52:24 +0530 Subject: [PATCH 13/15] fix: for the future --- health.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/health.go b/health.go index dca3e55..d2e5efe 100644 --- a/health.go +++ b/health.go @@ -203,7 +203,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")) + req.Header.Set("Authorization", "Bearer "+ apiKey) // send the request client = &http.Client{} @@ -249,7 +249,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { } req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")) + req.Header.Set("Authorization", "Bearer "+ apiKey) // send the request client = &http.Client{} @@ -284,7 +284,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { executeBody.Parameters = []WorkflowAppActionParameter{ { Name: "apikey", - Value: os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY"), + Value: apiKey, }, { Name: "url", @@ -307,7 +307,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")) + req.Header.Set("Authorization", "Bearer "+ apiKey) // send the request client = &http.Client{} @@ -365,7 +365,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")) + req.Header.Set("Authorization", "Bearer " + apiKey) // send the request client = &http.Client{} From 50d4349d8dffe108bf64dd37049f0eda1aba8316 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Wed, 11 Oct 2023 09:04:42 +0530 Subject: [PATCH 14/15] feat: stats API --- db-connector.go | 14 ++++---------- health.go | 29 ++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/db-connector.go b/db-connector.go index 2532a6c..af3759b 100755 --- a/db-connector.go +++ b/db-connector.go @@ -5288,11 +5288,12 @@ func SetNewValue(ctx context.Context, newvalue NewValue) error { return nil } -func GetPlatformHealth(ctx context.Context, getLatest bool) ([]HealthCheckDB, error) { +func GetPlatformHealth(ctx context.Context, limit int) ([]HealthCheckDB, error) { nameKey := "platform_health" // sort by "updated", and get the first one health := []HealthCheckDB{} + if project.DbType == "opensearch" { var buf bytes.Buffer query := map[string]interface{}{ @@ -5301,10 +5302,7 @@ func GetPlatformHealth(ctx context.Context, getLatest bool) ([]HealthCheckDB, er "order": "desc", }, }, - } - - if getLatest { - query["size"] = 1 + "size": limit, } if err := json.NewEncoder(&buf).Encode(query); err != nil { @@ -5363,11 +5361,7 @@ func GetPlatformHealth(ctx context.Context, getLatest bool) ([]HealthCheckDB, er } } else { - q := datastore.NewQuery(nameKey).Order("-updated") - - if getLatest { - q = q.Limit(1) - } + q := datastore.NewQuery(nameKey).Order("-updated").Limit(limit) _, err := project.Dbclient.GetAll(ctx, q, &health) if err != nil { diff --git a/health.go b/health.go index d2e5efe..65335c2 100644 --- a/health.go +++ b/health.go @@ -478,7 +478,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { } } else if force != "true" { // get last health check from database - healths, err := GetPlatformHealth(ctx, true) + healths, err := GetPlatformHealth(ctx, 1) health := healths[0] @@ -570,6 +570,33 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { resp.Write(platformData) } +func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) { + // for now, the limit is last 100 runs + limit := 100 + + healthChecks := []HealthCheckDB{} + ctx := GetContext(request) + + healthChecks, err := GetPlatformHealth(ctx, limit) + if err != nil { + log.Printf("[ERROR] Failed getting platform health from database: %s", err) + resp.WriteHeader(500) + resp.Write([]byte(`{"success": false, "reason": "Failed getting platform health from database."}`)) + return + } + + healthChecksData, err := json.Marshal(healthChecks) + if err != nil { + log.Printf("[ERROR] Failed marshalling platform health data: %s", err) + resp.WriteHeader(500) + resp.Write([]byte(`{"success": false, "reason": "Failed JSON parsing platform health."}`)) + return + } + + resp.WriteHeader(200) + resp.Write(healthChecksData) +} + func deleteWorkflow(workflowHealth WorkflowHealth , apiKey string) (error) { baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL") if len(baseUrl) == 0 { From 0645e5dc9d88f28eefd442eaf5c54c717659d74d Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Wed, 11 Oct 2023 09:12:12 +0530 Subject: [PATCH 15/15] fix: fixing go.mod --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 36b8b5e..be1b284 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/0x0elliot/shuffle-shared +module github.com/shuffle/shuffle-shared // Keep on 1.11 until AppEngine supports 1.17 or higher