Skip to content

Commit 41cc8d4

Browse files
committed
Add tests for Serf
1 parent 7c0cd02 commit 41cc8d4

File tree

5 files changed

+192
-27
lines changed

5 files changed

+192
-27
lines changed

cluster/agent_test.go

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,32 @@
11
package cluster
22

33
import (
4-
"net"
54
"strconv"
65
"testing"
76
"time"
87

98
"github.com/stretchr/testify/require"
109
"github.com/wind-c/comqtt/v2/cluster/log"
10+
"github.com/wind-c/comqtt/v2/cluster/utils"
1111
"github.com/wind-c/comqtt/v2/config"
1212
)
1313

14-
func getFreePort() (int, error) {
15-
listener, err := net.Listen("tcp", ":0")
16-
if err != nil {
17-
return 0, err
18-
}
19-
defer listener.Close()
20-
return listener.Addr().(*net.TCPAddr).Port, nil
21-
}
22-
2314
func TestCluster(t *testing.T) {
2415
log.Init(log.DefaultOptions())
2516

26-
bindPort1, err := getFreePort()
17+
bindPort1, err := utils.GetFreePort()
2718
require.NoError(t, err, "Failed to get free port for node1")
28-
raftPort1, err := getFreePort()
19+
raftPort1, err := utils.GetFreePort()
2920
require.NoError(t, err, "Failed to get free port for node1 Raft")
3021

31-
bindPort2, err := getFreePort()
22+
bindPort2, err := utils.GetFreePort()
3223
require.NoError(t, err, "Failed to get free port for node2")
33-
raftPort2, err := getFreePort()
24+
raftPort2, err := utils.GetFreePort()
3425
require.NoError(t, err, "Failed to get free port for node2 Raft")
3526

36-
bindPort3, err := getFreePort()
27+
bindPort3, err := utils.GetFreePort()
3728
require.NoError(t, err, "Failed to get free port for node3")
38-
raftPort3, err := getFreePort()
29+
raftPort3, err := utils.GetFreePort()
3930
require.NoError(t, err, "Failed to get free port for node3 Raft")
4031

4132
members := []string{

cluster/discovery/node.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ package discovery
66

77
import (
88
"encoding/json"
9-
"github.com/wind-c/comqtt/v2/mqtt"
109
"net"
1110
"os"
1211
"strconv"
12+
13+
"github.com/wind-c/comqtt/v2/mqtt"
1314
)
1415

1516
const (
@@ -31,7 +32,6 @@ type Node interface {
3132
BindMqttServer(server *mqtt.Server)
3233
LocalAddr() string
3334
LocalName() string
34-
NumMembers() int
3535
Members() []Member
3636
EventChan() <-chan *Event
3737
SendToNode(nodeName string, msg []byte) error

cluster/discovery/serf/membership.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"strconv"
99

1010
"github.com/hashicorp/logutils"
11-
"github.com/hashicorp/memberlist"
1211
"github.com/hashicorp/serf/serf"
1312
mb "github.com/wind-c/comqtt/v2/cluster/discovery"
1413
"github.com/wind-c/comqtt/v2/cluster/log"
@@ -99,8 +98,8 @@ func (m *Membership) EventChan() <-chan *mb.Event {
9998
return m.eventCh
10099
}
101100

102-
func (m *Membership) NumMembers() int {
103-
return m.serf.NumNodes()
101+
func (m *Membership) numMembers() int {
102+
return len(m.aliveMembers())
104103
}
105104

106105
func (m *Membership) LocalName() string {
@@ -195,10 +194,6 @@ func (m *Membership) eventLoop() {
195194
}
196195
}
197196

198-
func (m *Membership) send(to memberlist.Address, msg []byte) error {
199-
return m.serf.Memberlist().SendToAddress(to, msg)
200-
}
201-
202197
// SendToOthers send message to all nodes except yourself
203198
func (m *Membership) SendToOthers(msg []byte) {
204199
m.Broadcast(msg)
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package serf
2+
3+
import (
4+
"os"
5+
"strconv"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/wind-c/comqtt/v2/cluster/log"
11+
"github.com/wind-c/comqtt/v2/cluster/utils"
12+
"github.com/wind-c/comqtt/v2/config"
13+
)
14+
15+
func TestMain(m *testing.M) {
16+
log.Init(log.DefaultOptions())
17+
code := m.Run()
18+
os.Exit(code)
19+
}
20+
21+
func TestJoinAndLeave(t *testing.T) {
22+
bindPort1, err := utils.GetFreePort()
23+
assert.NoError(t, err)
24+
conf1 := &config.Cluster{
25+
BindAddr: "127.0.0.1",
26+
BindPort: bindPort1,
27+
NodeName: "test-node-1",
28+
}
29+
inboundMsgCh1 := make(chan []byte)
30+
membership1 := New(conf1, inboundMsgCh1)
31+
err = membership1.Setup()
32+
assert.NoError(t, err)
33+
defer membership1.Stop()
34+
35+
assert.Equal(t, 1, membership1.numMembers())
36+
37+
bindPort2, err := utils.GetFreePort()
38+
assert.NoError(t, err)
39+
conf2 := &config.Cluster{
40+
BindAddr: "127.0.0.1",
41+
BindPort: bindPort2,
42+
NodeName: "test-node-2",
43+
}
44+
inboundMsgCh2 := make(chan []byte)
45+
membership2 := New(conf2, inboundMsgCh2)
46+
err = membership2.Setup()
47+
assert.NoError(t, err)
48+
defer membership2.Stop()
49+
50+
numJoined, err := membership2.Join([]string{"127.0.0.1:" + strconv.Itoa(bindPort1)})
51+
assert.NoError(t, err)
52+
assert.Equal(t, numJoined, 1)
53+
assert.Equal(t, 2, membership1.numMembers())
54+
assert.Equal(t, 2, membership2.numMembers())
55+
56+
t.Log("Leave node 2")
57+
err = membership2.Leave()
58+
assert.NoError(t, err)
59+
60+
time.Sleep(5 * time.Second)
61+
62+
assert.Equal(t, 1, membership1.numMembers())
63+
}
64+
65+
func TestSendToNode(t *testing.T) {
66+
bindPort1, err := utils.GetFreePort()
67+
assert.NoError(t, err)
68+
bindPort2, err := utils.GetFreePort()
69+
assert.NoError(t, err)
70+
71+
conf1 := &config.Cluster{
72+
BindAddr: "127.0.0.1",
73+
BindPort: bindPort1,
74+
NodeName: "test-node-1",
75+
}
76+
conf2 := &config.Cluster{
77+
BindAddr: "127.0.0.1",
78+
BindPort: bindPort2,
79+
NodeName: "test-node-2",
80+
Members: []string{"127.0.0.1:" + strconv.Itoa(bindPort1)},
81+
}
82+
inboundMsgCh1 := make(chan []byte)
83+
inboundMsgCh2 := make(chan []byte)
84+
85+
membership1 := New(conf1, inboundMsgCh1)
86+
err = membership1.Setup()
87+
defer membership1.Stop()
88+
assert.NoError(t, err)
89+
90+
membership2 := New(conf2, inboundMsgCh2)
91+
err = membership2.Setup()
92+
defer membership2.Stop()
93+
assert.NoError(t, err)
94+
95+
time.Sleep(3 * time.Second)
96+
97+
err = membership1.SendToNode("test-node-2", []byte("test message"))
98+
assert.NoError(t, err)
99+
100+
select {
101+
case msg := <-inboundMsgCh2:
102+
assert.Equal(t, []byte("test message"), msg)
103+
case <-time.After(5 * time.Second):
104+
t.Fatal("Did not receive the message in membership2")
105+
}
106+
}
107+
108+
func TestSendToOthers(t *testing.T) {
109+
bindPort1, err := utils.GetFreePort()
110+
assert.NoError(t, err)
111+
bindPort2, err := utils.GetFreePort()
112+
assert.NoError(t, err)
113+
bindPort3, err := utils.GetFreePort()
114+
assert.NoError(t, err)
115+
116+
conf1 := &config.Cluster{
117+
BindAddr: "127.0.0.1",
118+
BindPort: bindPort1,
119+
NodeName: "test-node-1",
120+
}
121+
conf2 := &config.Cluster{
122+
BindAddr: "127.0.0.1",
123+
BindPort: bindPort2,
124+
NodeName: "test-node-2",
125+
Members: []string{"127.0.0.1:" + strconv.Itoa(bindPort1)},
126+
}
127+
conf3 := &config.Cluster{
128+
BindAddr: "127.0.0.1",
129+
BindPort: bindPort3,
130+
NodeName: "test-node-3",
131+
Members: []string{"127.0.0.1:" + strconv.Itoa(bindPort1)},
132+
}
133+
inboundMsgCh1 := make(chan []byte)
134+
inboundMsgCh2 := make(chan []byte)
135+
inboundMsgCh3 := make(chan []byte)
136+
137+
membership1 := New(conf1, inboundMsgCh1)
138+
err = membership1.Setup()
139+
defer membership1.Stop()
140+
assert.NoError(t, err)
141+
142+
membership2 := New(conf2, inboundMsgCh2)
143+
err = membership2.Setup()
144+
defer membership2.Stop()
145+
assert.NoError(t, err)
146+
147+
membership3 := New(conf3, inboundMsgCh3)
148+
err = membership3.Setup()
149+
defer membership3.Stop()
150+
assert.NoError(t, err)
151+
152+
time.Sleep(3 * time.Second)
153+
154+
membership1.SendToOthers([]byte("test message"))
155+
156+
select {
157+
case msg := <-inboundMsgCh2:
158+
assert.Equal(t, []byte("test message"), msg)
159+
case <-time.After(5 * time.Second):
160+
t.Fatal("Did not receive the message in membership2")
161+
}
162+
163+
select {
164+
case msg := <-inboundMsgCh3:
165+
assert.Equal(t, []byte("test message"), msg)
166+
case <-time.After(5 * time.Second):
167+
t.Fatal("Did not receive the message in membership3")
168+
}
169+
}

cluster/utils/utils.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ package utils
66

77
import (
88
"fmt"
9-
"github.com/hashicorp/go-sockaddr"
109
"net"
1110
"os"
1211
"reflect"
1312
"strings"
1413
"testing"
1514

16-
"github.com/satori/go.uuid"
15+
"github.com/hashicorp/go-sockaddr"
16+
17+
uuid "github.com/satori/go.uuid"
1718
)
1819

1920
func InArray(val interface{}, array interface{}) bool {
@@ -151,3 +152,12 @@ func PathExists(path string) bool {
151152
}
152153
return true
153154
}
155+
156+
func GetFreePort() (int, error) {
157+
listener, err := net.Listen("tcp", ":0")
158+
if err != nil {
159+
return 0, err
160+
}
161+
defer listener.Close()
162+
return listener.Addr().(*net.TCPAddr).Port, nil
163+
}

0 commit comments

Comments
 (0)