Skip to content

Commit

Permalink
add socket logic
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Jun 20, 2024
1 parent e081599 commit 5e89ede
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 18 deletions.
67 changes: 56 additions & 11 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ func (cr *Cluster) Enable(ctx context.Context) error {
}()
oldStatus := cr.status.Swap(clusterEnabling)
defer cr.status.CompareAndSwap(clusterEnabling, oldStatus)
return cr.enable(ctx)
}

func (cr *Cluster) enable(ctx context.Context) error {
storageStr := cr.storageManager.GetFlavorString(cr.storages)

log.TrInfof("info.cluster.enable.sending")
Expand Down Expand Up @@ -189,15 +192,64 @@ func (cr *Cluster) Enable(ctx context.Context) error {
if v := data[1]; !v.(bool) {
return fmt.Errorf("FATAL: Enable ack non true value, got (%T) %#v", v, v)
}
cr.disableSignal = make(chan struct{}, 0)
disableSignal := make(chan struct{}, 0)
cr.disableSignal = disableSignal
log.TrInfof("info.cluster.enabled")
cr.status.Store(clusterEnabled)
cr.socket.OnceConnect(func(_ *socket.Socket, ns string) {
if ns != "" {
return
}
if cr.status.Load() != clusterEnabled {
return
}
select {
case <-disableSignal:
return
default:
}
cr.status.Store(clusterEnabling)
go cr.reEnable(disableSignal)
})
return nil
}

func (cr *Cluster) reEnable(disableSignal <-chan struct{}) {
tctx, cancel := context.WithTimeout(context.Background(), time.Minute*7)
go func() {
select {
case <-tctx.Done():
case <-disableSignal:
cancel()
}
}()
err := cr.enable(tctx)
cancel()
if err != nil {
log.TrErrorf("error.cluster.enable.failed", err)
if cr.status.Load() == clusterEnabled {
ctx, cancel := context.WithCancel(context.Background())
timer := time.AfterFunc(time.Minute, func() {
cancel()
if cr.status.CompareAndSwap(clusterEnabled, clusterEnabling) {
cr.reEnable(disableSignal)
}
})
go func() {
select {
case <-ctx.Done():
case <-disableSignal:
cancel()
}
}()
}
}
}

// Disable send disable packet to central server
// The context passed in only affect the logical of Disable method
// Disable method is thread-safe, and it will wait until the first invoke exited
// Connection will not be closed after disable
func (cr *Cluster) Disable(ctx context.Context) error {
if cr.Enabled() {
cr.mux.Lock()
Expand Down Expand Up @@ -241,8 +293,8 @@ func (cr *Cluster) disable(ctx context.Context) error {
return nil
}

// markDisconnected marked the cluster as error or kicked
func (cr *Cluster) markDisconnected(kicked bool) {
// markKicked marks the cluster as kicked
func (cr *Cluster) markKicked() {
if !cr.Enabled() {
return
}
Expand All @@ -252,12 +304,5 @@ func (cr *Cluster) markDisconnected(kicked bool) {
return
}
defer close(cr.disableSignal)

var nextStatus int32
if kicked {
nextStatus = clusterKicked
} else {
nextStatus = clusterError
}
cr.status.Store(nextStatus)
cr.status.Store(clusterKicked)
}
2 changes: 2 additions & 0 deletions cluster/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type keepAliveReq struct {
}

// KeepAlive will send the keep-alive packet and fresh hits & hit bytes data
// If cluster is kicked by the central server, the cluster status will be mark as kicked
func (cr *Cluster) KeepAlive(ctx context.Context) KeepAliveRes {
hits, hbts := cr.hits.Load(), cr.hbts.Load()
resCh, err := cr.socket.EmitWithAck("keep-alive", keepAliveReq{
Expand Down Expand Up @@ -82,6 +83,7 @@ func (cr *Cluster) KeepAlive(ctx context.Context) KeepAliveRes {
cr.hits.Add(-hits2)
cr.hbts.Add(-hbts2)
if data[1] == false {
cr.markKicked()
return KeepAliveKicked
}
return KeepAliveSucceed
Expand Down
99 changes: 98 additions & 1 deletion cluster/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,110 @@ package cluster

import (
"context"
"fmt"

"github.com/LiterMC/socket.io"
"github.com/LiterMC/socket.io/engine.io"
)

// Connect connects to the central server
// The context passed in only affect the logical of Connect method
// Connection will not be closed after disable
//
// See Disconnect
func (cr *Cluster) Connect(ctx context.Context) error {
return
if !cr.Disconnected() {
return errors.New("Attempt to connect while connecting")
}
_, err := cr.GetAuthToken(ctx)
if err != nil {
return fmt.Errorf("Auth failed %w", err)
}

engio, err := engine.NewSocket(engine.Options{
Host: cr.prefix,
Path: "/socket.io/",
ExtraHeaders: http.Header{
"Origin": {cr.prefix},
"User-Agent": {build.ClusterUserAgent},
},
DialTimeout: time.Minute * 6,
})
if err != nil {
return fmt.Errorf("Could not parse Engine.IO options: %w", err)
}
if ctx.Value("cluster.options.engine-io.debug") == true {
engio.OnRecv(func(s *engine.Socket, data []byte) {
log.Debugf("Engine.IO %s recv: %q", s.ID(), (string)(data))
})
engio.OnSend(func(s *engine.Socket, data []byte) {
log.Debugf("Engine.IO %s send: %q", s.ID(), (string)(data))
})
}
engio.OnConnect(func(s *engine.Socket) {
log.Info("Engine.IO %s connected for cluster %s", s.ID(), cr.Id())
})
engio.OnDisconnect(cr.onDisconnected)
engio.OnDialError(func(s *engine.Socket, err *DialErrorContext) {
if err.Count() < 0 {
return
}
log.TrErrorf("error.cluster.connect.failed", cr.Id(), err.Count(), config.MaxReconnectCount, err.Err())
if config.MaxReconnectCount >= 0 && err.Count() >= config.MaxReconnectCount {
log.TrErrorf("error.cluster.connect.failed.toomuch", cr.Id())
s.Close()
}
})
log.Infof("Dialing %s for cluster %s", engio.URL().String(), cr.Id())
if err := engio.Dial(ctx); err != nil {
log.Errorf("Dial error: %v", err)
return false
}

cr.socket = socket.NewSocket(engio, socket.WithAuthTokenFn(func() (string, error) {
token, err := cr.GetAuthToken(ctx)
if err != nil {
log.TrErrorf("error.cluster.auth.failed", err)
return "", err
}
return token, nil
}))
cr.socket.OnError(func(_ *socket.Socket, err error) {
log.Errorf("Socket.IO error: %v", err)
})
cr.socket.OnMessage(func(event string, data []any) {
if event == "message" {
log.Infof("[remote]: %v", data[0])
}
})
log.Info("Connecting to socket.io namespace")
if err := cr.socket.Connect(""); err != nil {
log.Errorf("Namespace connect error: %v", err)
return false
}
return true
}

// Disconnect close the connection which connected to the central server
// Disconnect will not disable the cluster
//
// See Connect
func (cr *Cluster) Disconnect() error {
if cr.Disconnected() {
return
}
cr.mux.Lock()
defer cr.mux.Unlock()
err := cr.socket.Close()
cr.socketStatus.Store(socketDisconnected)
cr.socket = nil
return err
}

func (cr *Cluster) onDisconnected(s *engine.Socket, err error) {
if err != nil {
log.Warnf("Engine.IO %s disconnected: %v", s.ID(), err)
}
cr.socketStatus.Store(socketDisconnected)
cr.socket = nil
}
22 changes: 16 additions & 6 deletions cluster/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,29 @@

package cluster

const (
socketDisconnected = 0
socketConnected = 1
socketConnecting = 2
)

const (
clusterDisabled = 0
clusterEnabled = 1
clusterEnabling = 2
clusterKicked = 4
clusterError = 5
)

// Disconnected returns true if the cluster is disconnected from the central server
func (cr *Cluster) Disconnected() bool {
return cr.socketStatus.Load() == socketDisconnected
}

// Connected returns true if the cluster is connected to the central server
func (cr *Cluster) Connected() bool {
return cr.socketStatus.Load() == socketConnected
}

// Enabled returns true if the cluster is enabled or enabling
func (cr *Cluster) Enabled() bool {
s := cr.status.Load()
Expand All @@ -48,11 +63,6 @@ func (cr *Cluster) IsKicked() bool {
return cr.status.Load() == clusterKicked
}

// IsError returns true if the cluster is disabled since connection error
func (cr *Cluster) IsError() bool {
return cr.status.Load() == clusterError
}

// WaitForEnable returns a channel which receives true when cluster enabled succeed, or receives false when it failed to enable
// If the cluster is already enable, the channel always returns true
// The channel should not be used multiple times
Expand Down

0 comments on commit 5e89ede

Please sign in to comment.