Skip to content

Commit 3073e7f

Browse files
committed
added go routine, added sync status on/off api, updated frontend for sync on/off
1 parent 8fed7ee commit 3073e7f

20 files changed

+670
-328
lines changed

app_sync.db

12 KB
Binary file not shown.

cmd/api/app/routes/metrics/handler.go

Lines changed: 133 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,16 @@ import (
88
"path/filepath"
99
"strings"
1010
"encoding/json"
11-
"time"
11+
1212
v1 "github.com/karmada-io/dashboard/cmd/api/app/types/api/v1"
1313
"github.com/gin-gonic/gin"
1414
"github.com/karmada-io/dashboard/cmd/api/app/router"
1515
"github.com/karmada-io/dashboard/pkg/client"
1616
"github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
1717
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1818
kubeclient "k8s.io/client-go/kubernetes"
19-
"k8s.io/client-go/tools/clientcmd"
19+
"k8s.io/client-go/tools/clientcmd"
2020
"sync"
21-
"log"
2221
)
2322

2423
const (
@@ -31,7 +30,7 @@ const (
3130
controllerManagerPort = "8080"
3231
)
3332

34-
func fetchMetrics(appName string, queryType string, requests chan saveRequest) (map[string]*v1.ParsedData, []string, error) {
33+
func fetchMetrics(appName string, requests chan saveRequest) (map[string]*v1.ParsedData, []string, error) {
3534
kubeClient := client.InClusterClient()
3635
podsMap, errors := getKarmadaPods(appName)
3736
if len(podsMap) == 0 && len(errors) > 0 {
@@ -101,18 +100,133 @@ func fetchMetrics(appName string, queryType string, requests chan saveRequest) (
101100
return allMetrics, errors, nil
102101
}
103102

104-
105-
106103
func getMetrics(c *gin.Context) {
107104
appName := c.Param("app_name")
108105
queryType := c.Query("type")
109106

107+
if queryType == "sync_on" || queryType == "sync_off" {
108+
syncValue := 0
109+
if queryType == "sync_on" {
110+
syncValue = 1
111+
}
112+
113+
if appName == "" {
114+
// Stop all apps
115+
_, err := db.Exec("UPDATE app_sync SET sync_trigger = ?", syncValue)
116+
if err != nil {
117+
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("Failed to update sync_trigger for all apps: %v", err)})
118+
return
119+
}
120+
121+
// Cancel all existing contexts and create new ones if turning on
122+
contextMutex.Lock()
123+
for app := range appContexts {
124+
currentSyncValue, _ := syncMap.Load(app)
125+
if currentSyncValue == syncValue {
126+
continue // Skip if already in the desired state
127+
}
128+
129+
if cancel, exists := appCancelFuncs[app]; exists {
130+
cancel() // Cancel existing context
131+
}
132+
133+
if syncValue == 1 {
134+
// Create new context if turning on
135+
ctx, cancel := context.WithCancel(context.Background())
136+
appContexts[app] = ctx
137+
appCancelFuncs[app] = cancel
138+
go startAppMetricsFetcher(app)
139+
}
140+
141+
syncMap.Store(app, syncValue)
142+
}
143+
contextMutex.Unlock()
144+
145+
message := "Sync trigger updated successfully for all apps"
146+
if syncValue == 1 {
147+
message = "Sync turned on successfully for all apps"
148+
} else {
149+
message = "Sync turned off successfully for all apps"
150+
}
151+
c.JSON(http.StatusOK, gin.H{"message": message})
152+
} else {
153+
// Update specific app
154+
currentSyncValue, _ := syncMap.Load(appName)
155+
if currentSyncValue == syncValue {
156+
message := fmt.Sprintf("Sync is already %s for %s", queryType, appName)
157+
c.JSON(http.StatusOK, gin.H{"message": message})
158+
return
159+
}
160+
161+
_, err := db.Exec("UPDATE app_sync SET sync_trigger = ? WHERE app_name = ?", syncValue, appName)
162+
if err != nil {
163+
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("Failed to update sync_trigger: %v", err)})
164+
return
165+
}
166+
167+
contextMutex.Lock()
168+
if cancel, exists := appCancelFuncs[appName]; exists {
169+
cancel() // Cancel existing context
170+
}
171+
172+
if syncValue == 1 {
173+
// Create new context if turning on
174+
ctx, cancel := context.WithCancel(context.Background())
175+
appContexts[appName] = ctx
176+
appCancelFuncs[appName] = cancel
177+
go startAppMetricsFetcher(appName)
178+
}
179+
180+
syncMap.Store(appName, syncValue)
181+
contextMutex.Unlock()
182+
183+
var message string
184+
if syncValue == 1 {
185+
message = fmt.Sprintf("Sync turned on successfully for %s", appName)
186+
} else {
187+
message = fmt.Sprintf("Sync turned off successfully for %s", appName)
188+
}
189+
c.JSON(http.StatusOK, gin.H{"message": message})
190+
}
191+
192+
return
193+
}
194+
110195
if queryType == "metricsdetails" {
111196
queryMetrics(c)
112197
return
113198
}
114199

115-
allMetrics, errors, err := fetchMetrics(appName, queryType, requests)
200+
if queryType == "sync_status" {
201+
statusMap := make(map[string]bool)
202+
203+
// Get status for all registered apps
204+
for _, app := range []string{
205+
karmadaScheduler,
206+
karmadaControllerManager,
207+
karmadaAgent,
208+
karmadaSchedulerEstimator + "-member1",
209+
karmadaSchedulerEstimator + "-member2",
210+
karmadaSchedulerEstimator + "-member3",
211+
} {
212+
syncValue, exists := syncMap.Load(app)
213+
if !exists {
214+
statusMap[app] = false
215+
continue
216+
}
217+
218+
if value, ok := syncValue.(int); ok {
219+
statusMap[app] = value == 1
220+
} else {
221+
statusMap[app] = false
222+
}
223+
}
224+
225+
c.JSON(http.StatusOK, statusMap)
226+
return
227+
}
228+
229+
allMetrics, errors, err := fetchMetrics(appName, requests)
116230
if err != nil {
117231
c.JSON(http.StatusInternalServerError, gin.H{"errors": errors, "error": err.Error()})
118232
return
@@ -125,7 +239,6 @@ func getMetrics(c *gin.Context) {
125239
}
126240

127241

128-
129242
func getKarmadaAgentMetrics(podName string, clusterName string, requests chan saveRequest) (*v1.ParsedData, error) {
130243
kubeClient := client.InClusterKarmadaClient()
131244
clusters, err := kubeClient.ClusterV1alpha1().Clusters().List(context.TODO(), metav1.ListOptions{})
@@ -201,16 +314,14 @@ func getKarmadaAgentMetrics(podName string, clusterName string, requests chan sa
201314
data: parsedData,
202315
result: nil, // Not waiting for result
203316
}
204-
// if err := <-resultChan; err != nil {
205-
// return nil, fmt.Errorf("failed to save metrics to DB: %v", err)
206-
// }
207-
208317
return parsedData, nil
209318
}
319+
210320
func isJSON(data []byte) bool {
211321
var js json.RawMessage
212322
return json.Unmarshal(data, &js) == nil
213323
}
324+
214325
func getClusterPods(cluster *v1alpha1.Cluster) ([]v1.PodInfo, error) {
215326
fmt.Printf("Getting pods for cluster: %s\n", cluster.Name)
216327

@@ -292,55 +403,23 @@ func getKarmadaPods(appName string) (map[string][]v1.PodInfo, []string) {
292403

293404
return podsMap, errors
294405
}
295-
var (requests chan saveRequest)
296406

297-
func startAppMetricsFetcher(appName string) {
298-
ticker := time.NewTicker(1 * time.Second)
299-
defer ticker.Stop()
300-
for range ticker.C {
301-
fmt.Printf("Fetching metrics for %s at %s\n", appName, time.Now().Format(time.RFC3339))
302-
go func() {
303-
_, errors, err := fetchMetrics(appName, "", requests)
304-
if err != nil {
305-
log.Printf("Error fetching metrics for %s: %v, errors: %v\n", appName, err, errors)
306-
} else {
307-
log.Printf("Successfully fetched metrics for %s\n", appName)
308-
}
309-
}()
407+
func init() {
408+
goroutine()
409+
// Initialize the router with modified endpoints
410+
r := router.V1()
411+
r.GET("/metrics", getMetrics)
412+
r.GET("/metrics/:app_name", getMetrics)
413+
r.GET("/metrics/:app_name/:pod_name", queryMetrics)
310414
}
311-
}
312415

313-
func init() {
314-
// Initialize the application names
315-
appNames := []string{
316-
karmadaScheduler,
317-
karmadaControllerManager,
318-
karmadaAgent,
319-
karmadaSchedulerEstimator + "-member1",
320-
karmadaSchedulerEstimator + "-member2",
321-
karmadaSchedulerEstimator + "-member3",
322-
}
323416

324-
// Initialize the request channel
325-
requests = make(chan saveRequest, len(appNames)) // Buffered channel
326-
327-
// Start the database worker
328-
go startDatabaseWorker(requests)
329-
330-
// Start the per-app periodic metrics fetcher
331-
for _, app := range appNames {
332-
go startAppMetricsFetcher(app)
333-
}
334-
335-
// Initialize the router
336-
r := router.V1()
337-
r.GET("/metrics/:app_name", getMetrics)
338-
r.GET("/metrics/:app_name/:pod_name", queryMetrics)
339417
// http://localhost:8000/api/v1/metrics/karmada-scheduler //from terminal
340418

341419
// http://localhost:8000/api/v1/metrics/karmada-scheduler?type=metricsdetails //from sqlite details bar
342420

343421
// http://localhost:8000/api/v1/metrics/karmada-scheduler/karmada-scheduler-7bd4659f9f-hh44f?type=details&mname=workqueue_queue_duration_seconds
344422

345-
// all metrics details
346-
}
423+
// http://localhost:8000/api/v1/metrics?type=sync_off // to skip all metrics
424+
425+
// http://localhost:8000/api/v1/metrics/karmada-scheduler?type=sync_off // to skip specific metrics

cmd/api/app/routes/metrics/handlerqueries.go

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,51 @@ import (
66
"log"
77
"net/http"
88
"strings"
9+
"sync"
910
"time"
1011

1112
v1 "github.com/karmada-io/dashboard/cmd/api/app/types/api/v1"
1213
"github.com/gin-gonic/gin"
13-
_ "modernc.org/sqlite"
1414
)
1515

16+
var (
17+
dbMap = make(map[string]*sql.DB)
18+
dbMapLock sync.RWMutex
19+
)
20+
21+
// getDB returns an existing database connection or creates a new one
22+
func getDB(appName string) (*sql.DB, error) {
23+
sanitizedAppName := strings.ReplaceAll(appName, "-", "_")
24+
25+
dbMapLock.RLock()
26+
db, exists := dbMap[sanitizedAppName]
27+
dbMapLock.RUnlock()
28+
29+
if exists {
30+
return db, nil
31+
}
32+
33+
dbMapLock.Lock()
34+
defer dbMapLock.Unlock()
35+
36+
// Double-check after acquiring write lock
37+
if db, exists := dbMap[sanitizedAppName]; exists {
38+
return db, nil
39+
}
40+
41+
db, err := sql.Open("sqlite", fmt.Sprintf("file:%s.db?cache=shared&mode=rwc", sanitizedAppName))
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
// Set connection pool settings
47+
db.SetMaxOpenConns(1) // Restrict to 1 connection to prevent lock conflicts
48+
db.SetMaxIdleConns(1)
49+
50+
dbMap[sanitizedAppName] = db
51+
return db, nil
52+
}
53+
1654
func queryMetrics(c *gin.Context) {
1755
appName := c.Param("app_name")
1856
podName := c.Param("pod_name")
@@ -22,17 +60,25 @@ func queryMetrics(c *gin.Context) {
2260
sanitizedAppName := strings.ReplaceAll(appName, "-", "_")
2361
sanitizedPodName := strings.ReplaceAll(podName, "-", "_")
2462

25-
db, err := sql.Open("sqlite", sanitizedAppName+".db")
63+
db, err := getDB(sanitizedAppName)
2664
if err != nil {
27-
log.Printf("Error opening database: %v", err)
65+
log.Printf("Error getting database connection: %v", err)
2866
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to open database"})
2967
return
3068
}
31-
defer db.Close()
69+
70+
// Add transaction for consistent reads
71+
tx, err := db.Begin()
72+
if err != nil {
73+
log.Printf("Error starting transaction: %v", err)
74+
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to start transaction"})
75+
return
76+
}
77+
defer tx.Rollback()
3278

3379
switch queryType {
3480
case "mname":
35-
rows, err := db.Query(fmt.Sprintf("SELECT DISTINCT name FROM %s", sanitizedPodName))
81+
rows, err := tx.Query(fmt.Sprintf("SELECT DISTINCT name FROM %s", sanitizedPodName))
3682
if err != nil {
3783
log.Printf("Error querying metric names: %v, SQL Error: %v", err, err)
3884
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to query metric names"})
@@ -69,7 +115,7 @@ func queryMetrics(c *gin.Context) {
69115
INNER JOIN %s_values v ON m.id = v.metric_id
70116
WHERE m.name = ?
71117
`, sanitizedPodName, sanitizedPodName)
72-
rows, err := db.Query(query, metricName)
118+
rows, err := tx.Query(query, metricName)
73119
if err != nil {
74120
log.Printf("Error querying metric details: %v", err)
75121
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to query metric details"})
@@ -101,7 +147,7 @@ func queryMetrics(c *gin.Context) {
101147
}
102148

103149
labelsQuery := fmt.Sprintf("SELECT key, value FROM %s_labels WHERE value_id = ?", sanitizedPodName)
104-
labelsRows, err := db.Query(labelsQuery, valueID)
150+
labelsRows, err := tx.Query(labelsQuery, valueID)
105151
if err != nil {
106152
log.Printf("Error querying labels: %v", err)
107153
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to query labels"})

0 commit comments

Comments
 (0)