Skip to content

Commit

Permalink
feat: refactor grpc keepalive
Browse files Browse the repository at this point in the history
  • Loading branch information
naiba committed Dec 4, 2024
1 parent 68e3bb0 commit be51c5a
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 152 deletions.
2 changes: 1 addition & 1 deletion cmd/dashboard/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func main() {

singleton.CleanServiceHistory()
serviceSentinelDispatchBus := make(chan model.Service) // 用于传递服务监控任务信息的channel
rpc.DispatchKeepalive()
go rpc.DispatchTask(serviceSentinelDispatchBus)
go rpc.DispatchKeepalive()
go singleton.AlertSentinelStart()
singleton.NewServiceSentinel(serviceSentinelDispatchBus)

Expand Down
7 changes: 1 addition & 6 deletions cmd/dashboard/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,19 @@ func DispatchTask(serviceSentinelDispatchBus <-chan model.Service) {
workedServerIndex++
continue
}
// 找到合适机器执行任务,跳出循环
// singleton.SortedServerList[workedServerIndex].TaskStream.Send(task.PB())
// workedServerIndex++
// break
}
singleton.SortedServerLock.RUnlock()
}
}

func DispatchKeepalive() {
singleton.Cron.AddFunc("@every 60s", func() {
singleton.Cron.AddFunc("@every 30s", func() {
singleton.SortedServerLock.RLock()
defer singleton.SortedServerLock.RUnlock()
for i := 0; i < len(singleton.SortedServerList); i++ {
if singleton.SortedServerList[i] == nil || singleton.SortedServerList[i].TaskStream == nil {
continue
}

singleton.SortedServerList[i].TaskStream.Send(&proto.Task{Type: model.TaskTypeKeepalive})
}
})
Expand Down
1 change: 0 additions & 1 deletion model/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func execCase(t *testing.T, item testSt) {
CountryCode: "",
},
LastActive: time.Time{},
TaskClose: nil,
TaskStream: nil,
PrevTransferInSnapshot: 0,
PrevTransferOutSnapshot: 0,
Expand Down
7 changes: 1 addition & 6 deletions model/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package model

import (
"log"
"sync"
"time"

"gorm.io/gorm"
Expand Down Expand Up @@ -30,9 +29,7 @@ type Server struct {
GeoIP *GeoIP `gorm:"-" json:"geoip,omitempty"`
LastActive time.Time `gorm:"-" json:"last_active,omitempty"`

TaskClose chan error `gorm:"-" json:"-"`
TaskCloseLock *sync.Mutex `gorm:"-" json:"-"`
TaskStream pb.NezhaService_RequestTaskServer `gorm:"-" json:"-"`
TaskStream pb.NezhaService_RequestTaskServer `gorm:"-" json:"-"`

PrevTransferInSnapshot int64 `gorm:"-" json:"-"` // 上次数据点时的入站使用量
PrevTransferOutSnapshot int64 `gorm:"-" json:"-"` // 上次数据点时的出站使用量
Expand All @@ -43,8 +40,6 @@ func (s *Server) CopyFromRunningServer(old *Server) {
s.State = old.State
s.GeoIP = old.GeoIP
s.LastActive = old.LastActive
s.TaskClose = old.TaskClose
s.TaskCloseLock = old.TaskCloseLock
s.TaskStream = old.TaskStream
s.PrevTransferInSnapshot = old.PrevTransferInSnapshot
s.PrevTransferOutSnapshot = old.PrevTransferOutSnapshot
Expand Down
2 changes: 1 addition & 1 deletion model/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,5 @@ func (m *Service) AfterFind(tx *gorm.DB) error {

// IsServiceSentinelNeeded 判断该任务类型是否需要进行服务监控 需要则返回true
func IsServiceSentinelNeeded(t uint64) bool {
return t != TaskTypeCommand && t != TaskTypeTerminalGRPC && t != TaskTypeUpgrade
return t != TaskTypeCommand && t != TaskTypeTerminalGRPC && t != TaskTypeUpgrade && t != TaskTypeKeepalive
}
50 changes: 23 additions & 27 deletions proto/nezha.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions proto/nezha.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ package proto;
service NezhaService {
rpc ReportSystemState(stream State) returns (stream Receipt) {}
rpc ReportSystemInfo(Host) returns (Receipt) {}
rpc ReportTask(TaskResult) returns (Receipt) {}
rpc RequestTask(Host) returns (stream Task) {}
rpc RequestTask(stream TaskResult) returns (stream Task) {}
rpc IOStream(stream IOStreamData) returns (stream IOStreamData) {}
rpc ReportGeoIP(GeoIP) returns (GeoIP) {}
}
Expand Down
72 changes: 20 additions & 52 deletions proto/nezha_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions script/bootstrap.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
touch ./cmd/dashboard/user-dist/a
touch ./cmd/dashboard/admin-dist/a
swag init --pd -d . -g ./cmd/dashboard/main.go -o ./cmd/dashboard/docs --requiredByDefault
protoc --go-grpc_out="require_unimplemented_servers=false:." --go_out="." proto/*.proto
rm -rf ../agent/proto
Expand Down
2 changes: 0 additions & 2 deletions service/rpc/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rpc
import (
"context"
"strings"
"sync"

petname "github.com/dustinkirkland/golang-petname"
"github.com/hashicorp/go-uuid"
Expand Down Expand Up @@ -64,7 +63,6 @@ func (a *authHandler) Check(ctx context.Context) (uint64, error) {
}
s.Host = &model.Host{}
s.State = &model.HostState{}
s.TaskCloseLock = new(sync.Mutex)
// generate a random silly server name
singleton.ServerList[s.ID] = &s
singleton.ServerUUIDToID[clientUUID] = s.ID
Expand Down
Loading

0 comments on commit be51c5a

Please sign in to comment.