Skip to content

Commit

Permalink
Merge pull request #30 from openinfradev/alert
Browse files Browse the repository at this point in the history
feature. implementation applying systemNotificationRule
  • Loading branch information
seungkyua authored Apr 8, 2024
2 parents 1864618 + c54204c commit f487860
Show file tree
Hide file tree
Showing 19 changed files with 679 additions and 569 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
with:
ref: ${{ github.event.pull_request.head.sha }}

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/unittest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ jobs:
steps:
- name: Check out repository code
uses: actions/checkout@v2
with:
ref: ${{ github.event.pull_request.head.sha }}

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.18
go-version: 1.21

- name: Test
run: go test -v -cover ./...
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@

# Dependency directories (remove the comment below to include it)
# vendor/

cmd/server/server
cmd/server/start_dev.sh
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=linux/amd64 docker.io/library/golang:1.18-buster AS builder
FROM --platform=linux/amd64 docker.io/library/golang:1.21 AS builder

RUN mkdir -p /app
WORKDIR /app
Expand Down
13 changes: 7 additions & 6 deletions cmd/server/appgroup_status.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"

"github.com/openinfradev/tks-api/pkg/domain"
Expand All @@ -17,7 +18,7 @@ func processAppGroupStatus() error {
if len(appGroups) == 0 {
return nil
}
log.Info("appGroups : ", appGroups)
log.Info(context.TODO(), "appGroups : ", appGroups)

for i := range appGroups {
appGroup := appGroups[i]
Expand All @@ -32,13 +33,13 @@ func processAppGroupStatus() error {
var newMessage string

if workflowId != "" {
workflow, err := argowfClient.GetWorkflow("argo", workflowId)
workflow, err := argowfClient.GetWorkflow(context.TODO(), "argo", workflowId)
if err != nil {
log.Error("failed to get argo workflow. err : ", err)
log.Error(context.TODO(), "failed to get argo workflow. err : ", err)
continue
}
newMessage = fmt.Sprintf("(%s) %s", workflow.Status.Progress, workflow.Status.Message)
log.Debug(fmt.Sprintf("status [%s], newMessage [%s], phase [%s]", status, newMessage, workflow.Status.Phase))
log.Debug(context.TODO(), fmt.Sprintf("status [%s], newMessage [%s], phase [%s]", status, newMessage, workflow.Status.Phase))
if status == domain.AppGroupStatus_INSTALLING {
switch workflow.Status.Phase {
case "Running":
Expand Down Expand Up @@ -70,10 +71,10 @@ func processAppGroupStatus() error {
}

if status != newStatus || statusDesc != newMessage {
log.Debug(fmt.Sprintf("update status!! appGroupId [%s], newStatus [%s], newMessage [%s]", appGroupId, newStatus, newMessage))
log.Debug(context.TODO(), fmt.Sprintf("update status!! appGroupId [%s], newStatus [%s], newMessage [%s]", appGroupId, newStatus, newMessage))
err := applicationAccessor.UpdateAppGroupStatus(appGroupId, newStatus, newMessage, workflowId)
if err != nil {
log.Error("Failed to update appgroup status err : ", err)
log.Error(context.TODO(), "Failed to update appgroup status err : ", err)
continue
}
}
Expand Down
15 changes: 8 additions & 7 deletions cmd/server/cloud_account_status.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"

"github.com/openinfradev/tks-api/pkg/domain"
Expand All @@ -16,7 +17,7 @@ func processCloudAccountStatus() error {
if len(cloudAccounts) == 0 {
return nil
}
log.Info("cloudAccounts : ", cloudAccounts)
log.Info(context.TODO(), "cloudAccounts : ", cloudAccounts)

for i := range cloudAccounts {
cloudaccount := cloudAccounts[i]
Expand All @@ -31,14 +32,14 @@ func processCloudAccountStatus() error {
var newMessage string

if workflowId != "" {
workflow, err := argowfClient.GetWorkflow("argo", workflowId)
workflow, err := argowfClient.GetWorkflow(context.TODO(), "argo", workflowId)
if err != nil {
log.Error("failed to get argo workflow. err : ", err)
log.Error(context.TODO(), "failed to get argo workflow. err : ", err)
continue
}

newMessage = fmt.Sprintf("(%s) %s", workflow.Status.Progress, workflow.Status.Message)
log.Debug(fmt.Sprintf("status [%s], newMessage [%s], phase [%s]", status, newMessage, workflow.Status.Phase))
log.Debug(context.TODO(), fmt.Sprintf("status [%s], newMessage [%s], phase [%s]", status, newMessage, workflow.Status.Phase))

if status == domain.CloudAccountStatus_CREATING {
switch workflow.Status.Phase {
Expand Down Expand Up @@ -71,17 +72,17 @@ func processCloudAccountStatus() error {
}

if status != newStatus || statusDesc != newMessage {
log.Debug(fmt.Sprintf("update status!! cloudAccountId [%s], newStatus [%s], newMessage [%s]", cloudAccountId, newStatus, newMessage))
log.Debug(context.TODO(), fmt.Sprintf("update status!! cloudAccountId [%s], newStatus [%s], newMessage [%s]", cloudAccountId, newStatus, newMessage))
err := cloudAccountAccessor.UpdateCloudAccountStatus(cloudAccountId, newStatus, newMessage, workflowId)
if err != nil {
log.Error("Failed to update cloudaccount status err : ", err)
log.Error(context.TODO(), "Failed to update cloudaccount status err : ", err)
continue
}

if newStatus == domain.CloudAccountStatus_CREATED {
err = cloudAccountAccessor.UpdateCreatedIAM(cloudAccountId, true)
if err != nil {
log.Error("Failed to update cloudaccount createdIAM err : ", err)
log.Error(context.TODO(), "Failed to update cloudaccount createdIAM err : ", err)
continue
}

Expand Down
22 changes: 10 additions & 12 deletions cmd/server/cluster_byoh.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"context"
"encoding/json"
"fmt"

Expand All @@ -21,7 +22,7 @@ func processClusterByoh() error {
if len(clusters) == 0 {
return nil
}
log.Info("byoh clusters : ", clusters)
log.Info(context.TODO(), "byoh clusters : ", clusters)

token = getTksApiToken()
if token != "" {
Expand All @@ -34,7 +35,7 @@ func processClusterByoh() error {
url := fmt.Sprintf("clusters/%s/nodes", clusterId)
body, err := apiClient.Get(url)
if err != nil {
log.Error(err)
log.Error(context.TODO(), err)
continue
}

Expand All @@ -47,25 +48,25 @@ func processClusterByoh() error {
completed = false
}
}
log.Info(out.Nodes)
log.Info(context.TODO(), out.Nodes)

//completed = true // FOR TEST
if completed {
log.Info(fmt.Sprintf("all agents registered! starting stack creation. clusterId %s", clusterId))
log.Info(context.TODO(), fmt.Sprintf("all agents registered! starting stack creation. clusterId %s", clusterId))
// clusterId, newStatus, newMessage, workflowId
if err = clusterAccessor.UpdateClusterStatus(clusterId, domain.ClusterStatus_INSTALLING, "", ""); err != nil {
log.Error("Failed to update cluster status err : ", err)
log.Error(context.TODO(), "Failed to update cluster status err : ", err)
continue
}

if cluster.IsStack {
if _, err = apiClient.Post(fmt.Sprintf("organizations/%s/stacks/%s/install", cluster.OrganizationId, clusterId), nil); err != nil {
log.Error(err)
log.Error(context.TODO(), err)
continue
}
} else {
if _, err = apiClient.Post("clusters/"+clusterId+"/install", nil); err != nil {
log.Error(err)
log.Error(context.TODO(), err)
continue
}
}
Expand All @@ -88,10 +89,7 @@ func transcode(in, out interface{}) {
}

func getTksApiToken() string {
_, err := apiClient.Post("auth/ping", domain.PingTokenRequest{
Token: token,
OrganizationId: "master",
})
_, err := apiClient.Get("auth/verify-token")
if err != nil {
body, err := apiClient.Post("auth/login", domain.LoginRequest{
AccountId: viper.GetString("tks-api-account"),
Expand All @@ -105,7 +103,7 @@ func getTksApiToken() string {
var out domain.LoginResponse
transcode(body, &out)

log.Info(out.User.Token)
log.Info(context.TODO(), out.User.Token)
token = out.User.Token
}

Expand Down
13 changes: 7 additions & 6 deletions cmd/server/cluster_status.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"

"github.com/openinfradev/tks-api/pkg/domain"
Expand All @@ -16,7 +17,7 @@ func processClusterStatus() error {
if len(clusters) == 0 {
return nil
}
log.Info("clusters : ", clusters)
log.Info(context.TODO(), "clusters : ", clusters)

for i := range clusters {
cluster := clusters[i]
Expand All @@ -31,14 +32,14 @@ func processClusterStatus() error {
var newMessage string

if workflowId != "" {
workflow, err := argowfClient.GetWorkflow("argo", workflowId)
workflow, err := argowfClient.GetWorkflow(context.TODO(), "argo", workflowId)
if err != nil {
log.Error("failed to get argo workflow. err : ", err)
log.Error(context.TODO(), "failed to get argo workflow. err : ", err)
continue
}

newMessage = fmt.Sprintf("(%s) %s", workflow.Status.Progress, workflow.Status.Message)
log.Debug(fmt.Sprintf("status [%s], newMessage [%s], phase [%s]", status, newMessage, workflow.Status.Phase))
log.Debug(context.TODO(), fmt.Sprintf("status [%s], newMessage [%s], phase [%s]", status, newMessage, workflow.Status.Phase))

if status == domain.ClusterStatus_INSTALLING {
switch workflow.Status.Phase {
Expand Down Expand Up @@ -82,10 +83,10 @@ func processClusterStatus() error {
}

if status != newStatus || statusDesc != newMessage {
log.Debug(fmt.Sprintf("update status!! clusterId [%s], newStatus [%s], newMessage [%s]", clusterId, newStatus, newMessage))
log.Debug(context.TODO(), fmt.Sprintf("update status!! clusterId [%s], newStatus [%s], newMessage [%s]", clusterId, newStatus, newMessage))
err := clusterAccessor.UpdateClusterStatus(clusterId, newStatus, newMessage, workflowId)
if err != nil {
log.Error("Failed to update cluster status err : ", err)
log.Error(context.TODO(), "Failed to update cluster status err : ", err)
continue
}
}
Expand Down
45 changes: 27 additions & 18 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"
"fmt"
"time"
Expand All @@ -16,17 +17,19 @@ import (
"github.com/openinfradev/tks-batch/internal/cluster"
"github.com/openinfradev/tks-batch/internal/database"
"github.com/openinfradev/tks-batch/internal/organization"
systemNotificationRule "github.com/openinfradev/tks-batch/internal/system-notification-rule"
)

const INTERVAL_SEC = 5

var (
argowfClient argo.ArgoClient
clusterAccessor *cluster.ClusterAccessor
applicationAccessor *application.ApplicationAccessor
cloudAccountAccessor *cloudAccount.CloudAccountAccessor
organizationAccessor *organization.OrganizationAccessor
apiClient _apiClient.ApiClient
argowfClient argo.ArgoClient
clusterAccessor *cluster.ClusterAccessor
applicationAccessor *application.ApplicationAccessor
cloudAccountAccessor *cloudAccount.CloudAccountAccessor
organizationAccessor *organization.OrganizationAccessor
systemNotificationRuleAccessor *systemNotificationRule.SystemNotificationAccessor
apiClient _apiClient.ApiClient
)

func init() {
Expand All @@ -37,6 +40,7 @@ func init() {
flag.Int("tks-api-port", 9110, "server port number for tks-api")
flag.String("tks-api-account", "admin", "account name for tks-api")
flag.String("tks-api-password", "admin", "the password for tks-api account")
flag.String("kubeconfig-path", "", "path of kubeconfig. used development only!")

flag.String("dbhost", "localhost", "host of postgreSQL")
flag.String("dbport", "5432", "port of postgreSQL")
Expand All @@ -47,58 +51,63 @@ func init() {
pflag.CommandLine.AddGoFlagSet(flag.CommandLine)
flag.Parse()
if err := viper.BindPFlags(pflag.CommandLine); err != nil {
log.Error("Failed to bindFlags ", err)
log.Error(context.TODO(), "Failed to bindFlags ", err)
}

}

func main() {
log.Info("*** Arguments *** ")
log.Info(context.TODO(), "*** Arguments *** ")
for i, s := range viper.AllSettings() {
log.Info(fmt.Sprintf("%s : %v", i, s))
log.Info(context.TODO(), fmt.Sprintf("%s : %v", i, s))
}
log.Info("****************** ")
log.Info(context.TODO(), "****************** ")

// Initialize database
db, err := database.InitDB()
if err != nil {
log.Fatal("cannot connect gormDB")
log.Fatal(context.TODO(), "cannot connect gormDB")
}
clusterAccessor = cluster.New(db)
applicationAccessor = application.New(db)
cloudAccountAccessor = cloudAccount.New(db)
organizationAccessor = organization.New(db)
systemNotificationRuleAccessor = systemNotificationRule.New(db)

// initialize external clients
argowfClient, err = argo.New(viper.GetString("argo-address"), viper.GetInt("argo-port"), false, "")
if err != nil {
log.Fatal("failed to create argowf client : ", err)
log.Fatal(context.TODO(), "failed to create argowf client : ", err)
}
apiClient, err = _apiClient.New(fmt.Sprintf("%s:%d", viper.GetString("tks-api-address"), viper.GetInt("tks-api-port")))
if err != nil {
log.Fatal("failed to create tks-api client : ", err)
log.Fatal(context.TODO(), "failed to create tks-api client : ", err)
}

for {
err = processClusterStatus()
if err != nil {
log.Error(err)
log.Error(context.TODO(), err)
}
err = processAppGroupStatus()
if err != nil {
log.Error(err)
log.Error(context.TODO(), err)
}
err = processCloudAccountStatus()
if err != nil {
log.Error(err)
log.Error(context.TODO(), err)
}
err = processOrganizationStatus()
if err != nil {
log.Error(err)
log.Error(context.TODO(), err)
}
err = processClusterByoh()
if err != nil {
log.Error(err)
log.Error(context.TODO(), err)
}
err = processSystemNotificationRule()
if err != nil {
log.Error(context.TODO(), err)
}

time.Sleep(time.Second * INTERVAL_SEC)
Expand Down
Loading

0 comments on commit f487860

Please sign in to comment.