Skip to content

Commit

Permalink
1. 日志支持打印具体文件行数;2. GetClusterNodesInfo当查询所有节点时,可以通过一行命令解析,解析代码属于CPU密集…
Browse files Browse the repository at this point in the history
…型的,没必要用协程计算
  • Loading branch information
283713406 committed Sep 6, 2024
1 parent d1f53ad commit 9843838
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 24 deletions.
34 changes: 32 additions & 2 deletions caller/caller.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package caller

import (
"bytes"
"database/sql"
"fmt"
"io"
"log"
"os"
"scow-slurm-adapter/utils"
"path/filepath"

_ "github.com/go-sql-driver/mysql"
"github.com/sirupsen/logrus"
"gopkg.in/natefinch/lumberjack.v2"
"scow-slurm-adapter/utils"
)

var (
Expand All @@ -18,6 +21,32 @@ var (
Logger *logrus.Logger
)

type LogFormatter struct{}

func (m *LogFormatter) Format(entry *logrus.Entry) ([]byte, error) {
var b *bytes.Buffer
if entry.Buffer != nil {
b = entry.Buffer
} else {
b = &bytes.Buffer{}
}

timestamp := entry.Time.Format("2006-01-02 15:04:05")
var newLog string

// HasCaller()为true才会有调用信息
if entry.HasCaller() {
fName := filepath.Base(entry.Caller.File)
newLog = fmt.Sprintf("[%s] [%s] [%s:%d %s] %s\n",
timestamp, entry.Level, fName, entry.Caller.Line, entry.Caller.Function, entry.Message)
} else {
newLog = fmt.Sprintf("[%s] [%s] %s\n", timestamp, entry.Level, entry.Message)
}

b.WriteString(newLog)
return b.Bytes(), nil
}

func init() {
currentPwd, _ := os.Getwd()
ConfigValue = utils.ParseConfig(currentPwd + "/" + utils.DefaultConfigPath)
Expand All @@ -44,8 +73,9 @@ func initDB() {

func initLogger() {
Logger = logrus.New()
Logger.SetReportCaller(true)
// 设置日志输出格式为JSON
Logger.SetFormatter(&logrus.JSONFormatter{})
Logger.SetFormatter(&LogFormatter{})
// 设置日志级别为Info
Logger.SetLevel(logrus.InfoLevel)

Expand Down
1 change: 0 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"scow-slurm-adapter/caller"
pb "scow-slurm-adapter/gen/go"

"scow-slurm-adapter/services/account"
"scow-slurm-adapter/services/app"
"scow-slurm-adapter/services/config"
Expand Down
44 changes: 23 additions & 21 deletions services/config/config.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package config

import (
"bufio"
"context"
"fmt"
"scow-slurm-adapter/caller"
pb "scow-slurm-adapter/gen/go"
"scow-slurm-adapter/utils"
"strconv"
"strings"
"sync"
Expand All @@ -14,6 +12,9 @@ import (
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"scow-slurm-adapter/caller"
pb "scow-slurm-adapter/gen/go"
"scow-slurm-adapter/utils"
)

type ServerConfig struct {
Expand Down Expand Up @@ -731,7 +732,7 @@ func (s *ServerConfig) GetAvailablePartitions(ctx context.Context, in *pb.GetAva
return &pb.GetAvailablePartitionsResponse{Partitions: parts}, nil
}

func extractNodeInfo(info string) (*pb.NodeInfo, error) {
func extractNodeInfo(info string) *pb.NodeInfo {
var (
partitionList []string
totalGpusInt int
Expand Down Expand Up @@ -799,7 +800,7 @@ func extractNodeInfo(info string) (*pb.NodeInfo, error) {
GpuCount: uint32(totalGpusInt),
AllocGpuCount: uint32(allocGpusInt),
IdleGpuCount: uint32(totalGpusInt) - uint32(allocGpusInt),
}, nil
}
}

func getNodeInfo(node string, wg *sync.WaitGroup, nodeChan chan<- *pb.NodeInfo, errChan chan<- error) {
Expand All @@ -817,28 +818,23 @@ func getNodeInfo(node string, wg *sync.WaitGroup, nodeChan chan<- *pb.NodeInfo,
return
}

nodeInfo, err := extractNodeInfo(info)
if err != nil {
errChan <- err
return
}
nodeInfo := extractNodeInfo(info)

nodeChan <- nodeInfo
}

func (s *ServerConfig) GetClusterNodesInfo(ctx context.Context, in *pb.GetClusterNodesInfoRequest) (*pb.GetClusterNodesInfoResponse, error) {
var (
wg sync.WaitGroup
nodesInfo []*pb.NodeInfo
nodesInfoList []string
wg sync.WaitGroup
nodesInfo []*pb.NodeInfo
)
caller.Logger.Infof("Received request GetClusterNodesInfo: %v", in)
nodeChan := make(chan *pb.NodeInfo, len(in.NodeNames))
errChan := make(chan error, len(in.NodeNames))

if len(in.NodeNames) == 0 {
// 获取集群中全部节点的信息
getNodesInfoCmd := "scontrol show nodes --oneliner | grep Partitions | awk '{print $1}' | awk -F= '{print $2}' | tr '\n' ';'" // 获取全部计算节点主机名
getNodesInfoCmd := "scontrol show nodes --oneliner | grep Partitions" // 获取全部计算节点主机名
output, err := utils.RunCommand(getNodesInfoCmd)
if err != nil {
errInfo := &errdetails.ErrorInfo{
Expand All @@ -848,17 +844,22 @@ func (s *ServerConfig) GetClusterNodesInfo(ctx context.Context, in *pb.GetCluste
st, _ = st.WithDetails(errInfo)
return nil, st.Err()
}
nodesInfoList = strings.Split(output, ";")
nodesInfoList = nodesInfoList[:len(nodesInfoList)-1]
} else {
nodesInfoList = in.NodeNames
// 按行分割输出
scanner := bufio.NewScanner(strings.NewReader(output))
for scanner.Scan() {
line := scanner.Text()
nodeInfo := extractNodeInfo(line)
nodesInfo = append(nodesInfo, nodeInfo)
}
caller.Logger.Infof("GetClusterNodesInfoResponse: %v", nodesInfo)
return &pb.GetClusterNodesInfoResponse{Nodes: nodesInfo}, nil
}

for _, node := range nodesInfoList {
node1 := node
for _, node := range in.NodeNames {
nodeName := node
wg.Add(1)
go func() {
getNodeInfo(node1, &wg, chan<- *pb.NodeInfo(nodeChan), chan<- error(errChan))
getNodeInfo(nodeName, &wg, chan<- *pb.NodeInfo(nodeChan), chan<- error(errChan))
}()
}

Expand All @@ -879,6 +880,7 @@ func (s *ServerConfig) GetClusterNodesInfo(ctx context.Context, in *pb.GetCluste
}
default:
}
caller.Logger.Infof("GetClusterNodesInfoResponse: %v", nodesInfo)
return &pb.GetClusterNodesInfoResponse{Nodes: nodesInfo}, nil
}

Expand Down

0 comments on commit 9843838

Please sign in to comment.