Skip to content

Commit b97507f

Browse files
committed
add the implementation of historyserver beta version
Signed-off-by: KunWuLuan <kunwuluan@gmail.com>
1 parent 44f7f17 commit b97507f

File tree

235 files changed

+2823
-697
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

235 files changed

+2823
-697
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package historyserver
2+
3+
import (
4+
"context"
5+
"strings"
6+
7+
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
8+
"github.com/sirupsen/logrus"
9+
10+
"k8s.io/apimachinery/pkg/runtime"
11+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
12+
"k8s.io/client-go/rest"
13+
"k8s.io/client-go/tools/clientcmd"
14+
15+
"sigs.k8s.io/controller-runtime/pkg/client"
16+
)
17+
18+
type ClientManager struct {
19+
configs []*rest.Config
20+
clients []client.Client
21+
}
22+
23+
func (c *ClientManager) ListRayClusters(ctx context.Context) ([]*rayv1.RayCluster, error) {
24+
list := []*rayv1.RayCluster{}
25+
for _, c := range c.clients {
26+
listOfRayCluster := rayv1.RayClusterList{}
27+
err := c.List(ctx, &listOfRayCluster)
28+
if err != nil {
29+
logrus.Errorf("Failed to list RayClusters: %v", err)
30+
continue
31+
}
32+
for _, rayCluster := range listOfRayCluster.Items {
33+
list = append(list, &rayCluster)
34+
}
35+
}
36+
return list, nil
37+
}
38+
39+
func NewClientManager(kubeconfigs string) *ClientManager {
40+
kubeconfigList := []*rest.Config{}
41+
if len(kubeconfigs) > 0 {
42+
stringList := strings.Split(kubeconfigs, ",")
43+
if len(stringList) > 1 {
44+
// historyserver is able to get query from live gcs, which is not safe.
45+
// we hope to replace these apis with one events.
46+
logrus.Errorf("Only one kubeconfig is supported.")
47+
}
48+
for _, kubeconfig := range stringList {
49+
if kubeconfig != "" {
50+
c, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
51+
if err != nil {
52+
logrus.Errorf("Failed to build config from kubeconfig: %v", err)
53+
continue
54+
}
55+
c.QPS = 50
56+
c.Burst = 100
57+
kubeconfigList = append(kubeconfigList, c)
58+
logrus.Infof("add config from path: %v", kubeconfig)
59+
break
60+
}
61+
}
62+
} else {
63+
c, err := rest.InClusterConfig()
64+
c.QPS = 50
65+
c.Burst = 100
66+
if err == nil {
67+
kubeconfigList = append(kubeconfigList, c)
68+
logrus.Infof("add config from in cluster config")
69+
} else {
70+
logrus.Errorf("Failed to build config from kubeconfig: %v", err)
71+
}
72+
}
73+
scheme := runtime.NewScheme()
74+
utilruntime.Must(rayv1.AddToScheme(scheme))
75+
clientList := []client.Client{}
76+
for _, config := range kubeconfigList {
77+
c, err := client.New(config, client.Options{
78+
Scheme: scheme,
79+
})
80+
if err != nil {
81+
logrus.Errorf("Failed to create client: %v", err)
82+
continue
83+
}
84+
clientList = append(clientList, c)
85+
}
86+
logrus.Infof("create client manager successfully, clients: %v", len(clientList))
87+
return &ClientManager{
88+
configs: kubeconfigList,
89+
clients: clientList,
90+
}
91+
}

historyserver/backend/historyserver/reader.go

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package historyserver
22

33
import (
4+
"context"
45
"encoding/json"
6+
"fmt"
57
"io"
68
"net/http"
79
"os"
810
"path"
911
"path/filepath"
1012
"sort"
13+
"time"
1114

1215
"github.com/emicklei/go-restful/v3"
1316
"github.com/ray-project/kuberay/historyserver/utils"
@@ -17,14 +20,83 @@ import (
1720
func (s *ServerHandler) listClusters(limit int) []utils.ClusterInfo {
1821
// 初始的继续标记
1922
logrus.Debugf("Prepare to get list clusters info ...")
23+
ctx := context.Background()
24+
liveClusters, _ := s.clientManager.ListRayClusters(ctx)
25+
liveClusterNames := []string{}
26+
liveClusterInfos := []utils.ClusterInfo{}
27+
for _, liveCluster := range liveClusters {
28+
liveClusterInfo := utils.ClusterInfo{
29+
Name: liveCluster.Name,
30+
Namespace: liveCluster.Namespace,
31+
CreateTime: liveCluster.CreationTimestamp.String(),
32+
CreateTimeStamp: liveCluster.CreationTimestamp.Unix(),
33+
SessionName: "live",
34+
}
35+
liveClusterInfos = append(liveClusterInfos, liveClusterInfo)
36+
liveClusterNames = append(liveClusterNames, liveCluster.Name)
37+
}
38+
logrus.Infof("live clusters: %v", liveClusterNames)
2039
clusters := s.reader.List()
2140
sort.Sort(utils.ClusterInfoList(clusters))
2241
if limit > 0 {
2342
clusters = clusters[:limit]
2443
}
44+
clusters = append(liveClusterInfos, clusters...)
2545
return clusters
2646
}
2747

48+
func (s *ServerHandler) _getNodeLogs(rayClusterNameID, sessionId, nodeId, dir string) ([]byte, error) {
49+
logPath := path.Join(sessionId, "logs", nodeId)
50+
if dir != "" {
51+
logPath = path.Join(logPath, dir)
52+
}
53+
files := s.reader.ListFiles(rayClusterNameID, logPath)
54+
ret := map[string]interface{}{
55+
"data": map[string]interface{}{
56+
"result": map[string]interface{}{
57+
"padding": files,
58+
},
59+
},
60+
}
61+
return json.Marshal(ret)
62+
}
63+
64+
func (s *ServerHandler) GetNodes(rayClusterNameID, sessionId string) ([]byte, error) {
65+
logPath := path.Join(sessionId, "logs")
66+
nodes := s.reader.ListFiles(rayClusterNameID, logPath)
67+
templ := map[string]interface{}{
68+
"result": true,
69+
"msg": "Node summary fetched.",
70+
"data": map[string]interface{}{
71+
"summary": []map[string]interface{}{},
72+
},
73+
}
74+
nodeSummary := []map[string]interface{}{}
75+
for _, node := range nodes {
76+
nodeSummary = append(nodeSummary, map[string]interface{}{
77+
"raylet": map[string]interface{}{
78+
"nodeId": path.Clean(node),
79+
"state": "ALIVE",
80+
},
81+
"ip": "UNKNOWN",
82+
})
83+
}
84+
templ["data"].(map[string]interface{})["summary"] = nodeSummary
85+
return json.Marshal(templ)
86+
}
87+
88+
func (s *ServerHandler) ClusterInfo(rayClusterNameID string) []byte {
89+
templ := `{
90+
"result": true,
91+
"msg": "Got formatted cluster status.",
92+
"data": {
93+
"clusterStatus": "======== Autoscaler status: %f ========\nNode status\n---------------------------------------------------------------\nActive:\n (no active nodes)\nIdle:\n 0 headgroup\nPending:\n (no pending nodes)\nRecent failures:\n (no failures)\n\nResources\n---------------------------------------------------------------\nTotal Usage:\n 0B/0B memory\n 0B/0B object_store_memory\n\nFrom request_resources:\n (none)\nPending Demands:\n (no resource demands)"
94+
}
95+
}`
96+
afterRender := fmt.Sprintf(templ, time.Now().Format("2006-01-02 15:04:05.000000"))
97+
return []byte(afterRender)
98+
}
99+
28100
func (s *ServerHandler) MetaKeyInfo(rayClusterNameID, key string) []byte {
29101
baseObject := path.Join(utils.GetMetaDirByNameID(s.rootDir, rayClusterNameID), key)
30102
logrus.Infof("Prepare to get object %s info ...", baseObject)
@@ -37,8 +109,8 @@ func (s *ServerHandler) MetaKeyInfo(rayClusterNameID, key string) []byte {
37109
return data
38110
}
39111

40-
func (s *ServerHandler) LogKeyInfo(rayClusterNameID, nodeID, key string, lines int64) []byte {
41-
baseObject := path.Join(utils.GetLogDirByNameID(s.rootDir, rayClusterNameID, nodeID), key)
112+
func (s *ServerHandler) LogKeyInfo(rayClusterNameID, nodeID, sessionId, key string, lines int64) []byte {
113+
baseObject := path.Join(utils.GetLogDirByNameID(s.rootDir, rayClusterNameID, nodeID, sessionId), key)
42114
logrus.Infof("Prepare to get object %s info ...", baseObject)
43115
body := s.reader.GetContent(rayClusterNameID, baseObject)
44116
data, err := io.ReadAll(body)
@@ -56,8 +128,22 @@ func (s *ServerHandler) staticFileHandler(req *restful.Request, resp *restful.Re
56128
// Get the path parameter
57129
path := req.PathParameter("path")
58130

131+
isHomePage := true
132+
_, err := req.Request.Cookie(COOKIE_CLUSTER_NAME_KEY)
133+
isHomePage = err != nil
134+
prefix := ""
135+
if isHomePage {
136+
prefix = "homepage"
137+
} else {
138+
version := "v2.51.0"
139+
if versionCookie, err := req.Request.Cookie(COOKIE_DASHBOARD_VERSION_KEY); err == nil {
140+
version = versionCookie.Value
141+
}
142+
prefix = version + "/client/build"
143+
}
144+
59145
// Construct the full path to the static directory
60-
fullPath := filepath.Join(s.dashboardDir, "static", path)
146+
fullPath := filepath.Join(s.dashboardDir, prefix, "static", path)
61147
logrus.Infof("staticFileHandler fullpath %s", fullPath)
62148

63149
// Check if the full path exists

0 commit comments

Comments
 (0)