Skip to content

Commit

Permalink
fix some bugs
Browse files Browse the repository at this point in the history
* add servertime on handshake
* slice del bug fixed
* bug fixed #84
  • Loading branch information
zlonglin authored Sep 6, 2022
1 parent 2be8399 commit 7699d53
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 9 deletions.
19 changes: 15 additions & 4 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,16 @@ func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (*
}

resp := &clusterpb.RegisterResponse{}
for _, m := range c.members {
for k, m := range c.members {
if m.memberInfo.ServiceAddr == req.MemberInfo.ServiceAddr {
return nil, fmt.Errorf("address %s has registered", req.MemberInfo.ServiceAddr)
// 节点异常崩溃,不会执行unregister,此时再次启动该节点,由于已存在注册信息,将再也无法成功注册,这里做个修改,先移除后重新注册
if k >= len(c.members)-1 {
c.members = c.members[:k]
} else {
c.members = append(c.members[:k], c.members[k+1:]...)
}
break
//return nil, fmt.Errorf("address %s has registered", req.MemberInfo.ServiceAddr)
}
}

Expand Down Expand Up @@ -126,7 +133,7 @@ func (c *cluster) Unregister(_ context.Context, req *clusterpb.UnregisterRequest
// Register services to current node
c.currentNode.handler.delMember(req.ServiceAddr)
c.mu.Lock()
if index == len(c.members)-1 {
if index >= len(c.members)-1 {
c.members = c.members[:index]
} else {
c.members = append(c.members[:index], c.members[index+1:]...)
Expand Down Expand Up @@ -187,7 +194,11 @@ func (c *cluster) delMember(addr string) {
}
}
if index != -1 {
c.members = append(c.members[:index], c.members[index+1:]...)
if index >= len(c.members)-1 {
c.members = c.members[:index]
} else {
c.members = append(c.members[:index], c.members[index+1:]...)
}
}
c.mu.Unlock()
}
34 changes: 29 additions & 5 deletions cluster/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,30 @@ var (
type rpcHandler func(session *session.Session, msg *message.Message, noCopy bool)

func cache() {
data, err := json.Marshal(map[string]interface{}{
hrdata := map[string]interface{}{
"code": 200,
"sys": map[string]float64{"heartbeat": env.Heartbeat.Seconds()},
})
"sys": map[string]interface{}{
"heartbeat": env.Heartbeat.Seconds(),
"servertime": time.Now().UTC().Unix(),
},
}
if dict, ok := message.GetDictionary(); ok {
hrdata = map[string]interface{}{
"code": 200,
"sys": map[string]interface{}{
"heartbeat": env.Heartbeat.Seconds(),
"servertime": time.Now().UTC().Unix(),
"dict": dict,
},
}
}
// data, err := json.Marshal(map[string]interface{}{
// "code": 200,
// "sys": map[string]float64{
// "heartbeat": env.Heartbeat.Seconds(),
// },
// })
data, err := json.Marshal(hrdata)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -140,7 +160,11 @@ func (h *LocalHandler) delMember(addr string) {
for name, members := range h.remoteServices {
for i, maddr := range members {
if addr == maddr.ServiceAddr {
members = append(members[:i], members[i+1:]...)
if i >= len(members)-1 {
members = members[:i]
} else {
members = append(members[:i], members[i+1:]...)
}
}
}
if len(members) == 0 {
Expand Down Expand Up @@ -192,7 +216,7 @@ func (h *LocalHandler) handle(conn net.Conn) {

members := h.currentNode.cluster.remoteAddrs()
for _, remote := range members {
log.Println("Notify remote server success", remote)
log.Println("Notify remote server", remote)
pool, err := h.currentNode.rpcClient.getConnPool(remote)
if err != nil {
log.Println("Cannot retrieve connection pool for address", remote, err)
Expand Down
7 changes: 7 additions & 0 deletions internal/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,10 @@ func SetDictionary(dict map[string]uint16) {
codes[code] = r
}
}

func GetDictionary() (map[string]uint16, bool) {
if len(routes) <= 0 {
return nil, false
}
return routes, true
}

0 comments on commit 7699d53

Please sign in to comment.