Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1. 日志支持打印具体文件行数;2. GetClusterNodesInfo当查询所有节点时,可以通过一行命令解析,解析代码属于CPU密集… #16

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions caller/caller.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package caller

import (
"bytes"
"database/sql"
"fmt"
"io"
"log"
"os"
"scow-slurm-adapter/utils"
"path/filepath"

_ "github.com/go-sql-driver/mysql"
"github.com/sirupsen/logrus"
"gopkg.in/natefinch/lumberjack.v2"
"scow-slurm-adapter/utils"
)

var (
Expand All @@ -18,6 +21,32 @@ var (
Logger *logrus.Logger
)

type LogFormatter struct{}

func (m *LogFormatter) Format(entry *logrus.Entry) ([]byte, error) {
var b *bytes.Buffer
if entry.Buffer != nil {
b = entry.Buffer
} else {
b = &bytes.Buffer{}
}

timestamp := entry.Time.Format("2006-01-02 15:04:05")
var newLog string

// HasCaller()为true才会有调用信息
if entry.HasCaller() {
fName := filepath.Base(entry.Caller.File)
newLog = fmt.Sprintf("[%s] [%s] [%s:%d %s] %s\n",
timestamp, entry.Level, fName, entry.Caller.Line, entry.Caller.Function, entry.Message)
} else {
newLog = fmt.Sprintf("[%s] [%s] %s\n", timestamp, entry.Level, entry.Message)
}

b.WriteString(newLog)
return b.Bytes(), nil
}

func init() {
currentPwd, _ := os.Getwd()
ConfigValue = utils.ParseConfig(currentPwd + "/" + utils.DefaultConfigPath)
Expand All @@ -44,8 +73,9 @@ func initDB() {

func initLogger() {
Logger = logrus.New()
Logger.SetReportCaller(true)
// 设置日志输出格式为JSON
Logger.SetFormatter(&logrus.JSONFormatter{})
Logger.SetFormatter(&LogFormatter{})
// 设置日志级别为Info
Logger.SetLevel(logrus.InfoLevel)

Expand Down
1 change: 0 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"scow-slurm-adapter/caller"
pb "scow-slurm-adapter/gen/go"

"scow-slurm-adapter/services/account"
"scow-slurm-adapter/services/app"
"scow-slurm-adapter/services/config"
Expand Down
44 changes: 23 additions & 21 deletions services/config/config.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package config

import (
"bufio"
"context"
"fmt"
"scow-slurm-adapter/caller"
pb "scow-slurm-adapter/gen/go"
"scow-slurm-adapter/utils"
"strconv"
"strings"
"sync"
Expand All @@ -14,6 +12,9 @@ import (
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"scow-slurm-adapter/caller"
pb "scow-slurm-adapter/gen/go"
"scow-slurm-adapter/utils"
)

type ServerConfig struct {
Expand Down Expand Up @@ -731,7 +732,7 @@ func (s *ServerConfig) GetAvailablePartitions(ctx context.Context, in *pb.GetAva
return &pb.GetAvailablePartitionsResponse{Partitions: parts}, nil
}

func extractNodeInfo(info string) (*pb.NodeInfo, error) {
func extractNodeInfo(info string) *pb.NodeInfo {
var (
partitionList []string
totalGpusInt int
Expand Down Expand Up @@ -799,7 +800,7 @@ func extractNodeInfo(info string) (*pb.NodeInfo, error) {
GpuCount: uint32(totalGpusInt),
AllocGpuCount: uint32(allocGpusInt),
IdleGpuCount: uint32(totalGpusInt) - uint32(allocGpusInt),
}, nil
}
}

func getNodeInfo(node string, wg *sync.WaitGroup, nodeChan chan<- *pb.NodeInfo, errChan chan<- error) {
Expand All @@ -817,28 +818,23 @@ func getNodeInfo(node string, wg *sync.WaitGroup, nodeChan chan<- *pb.NodeInfo,
return
}

nodeInfo, err := extractNodeInfo(info)
if err != nil {
errChan <- err
return
}
nodeInfo := extractNodeInfo(info)

nodeChan <- nodeInfo
}

func (s *ServerConfig) GetClusterNodesInfo(ctx context.Context, in *pb.GetClusterNodesInfoRequest) (*pb.GetClusterNodesInfoResponse, error) {
var (
wg sync.WaitGroup
nodesInfo []*pb.NodeInfo
nodesInfoList []string
wg sync.WaitGroup
nodesInfo []*pb.NodeInfo
)
caller.Logger.Infof("Received request GetClusterNodesInfo: %v", in)
nodeChan := make(chan *pb.NodeInfo, len(in.NodeNames))
errChan := make(chan error, len(in.NodeNames))

if len(in.NodeNames) == 0 {
// 获取集群中全部节点的信息
getNodesInfoCmd := "scontrol show nodes --oneliner | grep Partitions | awk '{print $1}' | awk -F= '{print $2}' | tr '\n' ';'" // 获取全部计算节点主机名
getNodesInfoCmd := "scontrol show nodes --oneliner | grep Partitions" // 获取全部计算节点主机名
output, err := utils.RunCommand(getNodesInfoCmd)
if err != nil {
errInfo := &errdetails.ErrorInfo{
Expand All @@ -848,17 +844,22 @@ func (s *ServerConfig) GetClusterNodesInfo(ctx context.Context, in *pb.GetCluste
st, _ = st.WithDetails(errInfo)
return nil, st.Err()
}
nodesInfoList = strings.Split(output, ";")
nodesInfoList = nodesInfoList[:len(nodesInfoList)-1]
} else {
nodesInfoList = in.NodeNames
// 按行分割输出
scanner := bufio.NewScanner(strings.NewReader(output))
for scanner.Scan() {
line := scanner.Text()
nodeInfo := extractNodeInfo(line)
nodesInfo = append(nodesInfo, nodeInfo)
}
caller.Logger.Infof("GetClusterNodesInfoResponse: %v", nodesInfo)
return &pb.GetClusterNodesInfoResponse{Nodes: nodesInfo}, nil
}

for _, node := range nodesInfoList {
node1 := node
for _, node := range in.NodeNames {
nodeName := node
wg.Add(1)
go func() {
getNodeInfo(node1, &wg, chan<- *pb.NodeInfo(nodeChan), chan<- error(errChan))
getNodeInfo(nodeName, &wg, chan<- *pb.NodeInfo(nodeChan), chan<- error(errChan))
}()
}

Expand All @@ -879,6 +880,7 @@ func (s *ServerConfig) GetClusterNodesInfo(ctx context.Context, in *pb.GetCluste
}
default:
}
caller.Logger.Infof("GetClusterNodesInfoResponse: %v", nodesInfo)
return &pb.GetClusterNodesInfoResponse{Nodes: nodesInfo}, nil
}

Expand Down
Loading