Skip to content

Commit eec8742

Browse files
committed
Add tests for etcd Peer
1 parent 2ea6541 commit eec8742

File tree

2 files changed

+116
-4
lines changed

2 files changed

+116
-4
lines changed

cluster/raft/etcd/peer.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ import (
3535

3636
var (
3737
ErrInvalidID = errors.New("node name must be a number")
38-
39-
appliedWaitDelay = 100 * time.Millisecond
4038
)
4139

4240
type commit struct {
@@ -285,7 +283,6 @@ func (p *Peer) serveRaft() {
285283
default:
286284
log.Fatal("[raft] failed to serve rafthttp", "error", err)
287285
}
288-
close(p.httpStopC)
289286
}
290287

291288
func (p *Peer) serveChannels() {
@@ -622,7 +619,7 @@ func (p *Peer) IsApplyRight() bool {
622619
}
623620

624621
func (p *Peer) GetLeader() (addr, id string) {
625-
return
622+
return "", strconv.FormatUint(p.node.Status().SoftState.Lead, 10)
626623
}
627624

628625
func (p *Peer) GenPeersFile(file string) error {

cluster/raft/etcd/peer_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package etcd
2+
3+
import (
4+
"net"
5+
"strconv"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/require"
10+
"github.com/wind-c/comqtt/v2/cluster/log"
11+
"github.com/wind-c/comqtt/v2/cluster/message"
12+
"github.com/wind-c/comqtt/v2/config"
13+
"github.com/wind-c/comqtt/v2/mqtt/packets"
14+
)
15+
16+
func createTestPeer(t *testing.T) *Peer {
17+
conf := &config.Cluster{
18+
NodeName: "1",
19+
BindAddr: "127.0.0.1",
20+
RaftImpl: config.RaftImplEtcd,
21+
RaftPort: 8948,
22+
RaftDir: t.TempDir(),
23+
RaftBootstrap: true,
24+
}
25+
notifyCh := make(chan *message.Message, 1)
26+
peer, err := Setup(conf, notifyCh)
27+
require.Nil(t, err)
28+
return peer
29+
}
30+
31+
func TestJoinAndLeave(t *testing.T) {
32+
log.Init(log.DefaultOptions())
33+
peer := createTestPeer(t)
34+
defer peer.Stop()
35+
36+
node2ID := "2"
37+
node2Host := "127.0.0.1"
38+
node2Port := 8949
39+
conf2 := &config.Cluster{
40+
NodeName: node2ID,
41+
BindAddr: node2Host,
42+
RaftImpl: config.RaftImplEtcd,
43+
RaftPort: node2Port,
44+
RaftDir: t.TempDir(),
45+
RaftBootstrap: true,
46+
}
47+
notifyCh := make(chan *message.Message, 1)
48+
_, err := Setup(conf2, notifyCh)
49+
require.NoError(t, err)
50+
51+
node2IDUint, err := strconv.ParseUint(node2ID, 10, 64)
52+
require.NoError(t, err)
53+
54+
// Test Join
55+
err = peer.Join(node2ID, net.JoinHostPort(node2Host, strconv.Itoa(node2Port)))
56+
require.NoError(t, err)
57+
58+
time.Sleep(2 * time.Second)
59+
60+
found := false
61+
for _, majorityConfig := range peer.node.Status().Config.Voters {
62+
if _, ok := majorityConfig[node2IDUint]; ok {
63+
found = true
64+
break
65+
}
66+
}
67+
require.True(t, found)
68+
69+
err = peer.Leave(node2ID)
70+
require.NoError(t, err)
71+
72+
time.Sleep(2 * time.Second)
73+
74+
found = false
75+
for _, c := range peer.node.Status().Config.Voters {
76+
t.Log(found)
77+
if _, ok := c[node2IDUint]; ok {
78+
found = true
79+
break
80+
}
81+
}
82+
require.False(t, found)
83+
}
84+
85+
func TestProposeAndLookup(t *testing.T) {
86+
peer := createTestPeer(t)
87+
defer peer.Stop()
88+
89+
msg := &message.Message{
90+
Type: packets.Subscribe,
91+
NodeID: "1",
92+
Payload: []byte("filter"),
93+
}
94+
95+
err := peer.Propose(msg)
96+
require.NoError(t, err)
97+
98+
key := "filter"
99+
expectedValue := "1"
100+
101+
time.Sleep(2 * time.Second)
102+
103+
result := peer.Lookup(key)
104+
require.Equal(t, []string{expectedValue}, result)
105+
}
106+
107+
func TestGetLeader(t *testing.T) {
108+
peer := createTestPeer(t)
109+
defer peer.Stop()
110+
111+
time.Sleep(2 * time.Second)
112+
113+
_, id := peer.GetLeader()
114+
require.Equal(t, "1", id)
115+
}

0 commit comments

Comments
 (0)