diff --git a/cluster/raft/etcd/peer.go b/cluster/raft/etcd/peer.go index 5d8c1b1..0241b68 100644 --- a/cluster/raft/etcd/peer.go +++ b/cluster/raft/etcd/peer.go @@ -35,8 +35,6 @@ import ( var ( ErrInvalidID = errors.New("node name must be a number") - - appliedWaitDelay = 100 * time.Millisecond ) type commit struct { @@ -285,7 +283,6 @@ func (p *Peer) serveRaft() { default: log.Fatal("[raft] failed to serve rafthttp", "error", err) } - close(p.httpStopC) } func (p *Peer) serveChannels() { @@ -622,7 +619,7 @@ func (p *Peer) IsApplyRight() bool { } func (p *Peer) GetLeader() (addr, id string) { - return + return "", strconv.FormatUint(p.node.Status().SoftState.Lead, 10) } func (p *Peer) GenPeersFile(file string) error { diff --git a/cluster/raft/etcd/peer_test.go b/cluster/raft/etcd/peer_test.go new file mode 100644 index 0000000..ea60dc9 --- /dev/null +++ b/cluster/raft/etcd/peer_test.go @@ -0,0 +1,119 @@ +package etcd + +import ( + "net" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/wind-c/comqtt/v2/cluster/log" + "github.com/wind-c/comqtt/v2/cluster/message" + "github.com/wind-c/comqtt/v2/config" + "github.com/wind-c/comqtt/v2/mqtt/packets" +) + +func createTestPeer(t *testing.T) *Peer { + conf := &config.Cluster{ + NodeName: "1", + BindAddr: "127.0.0.1", + RaftImpl: config.RaftImplEtcd, + RaftPort: 8946, + RaftDir: t.TempDir(), + RaftBootstrap: true, + } + notifyCh := make(chan *message.Message, 1) + peer, err := Setup(conf, notifyCh) + require.Nil(t, err) + return peer +} + +func TestJoinAndLeave(t *testing.T) { + log.Init(log.DefaultOptions()) + peer := createTestPeer(t) + defer peer.Stop() + + node2ID := "2" + node2Host := "127.0.0.1" + node2Port := 8947 + conf2 := &config.Cluster{ + NodeName: node2ID, + BindAddr: node2Host, + RaftImpl: config.RaftImplEtcd, + RaftPort: node2Port, + RaftDir: t.TempDir(), + RaftBootstrap: true, + } + notifyCh := make(chan *message.Message, 1) + _, err := Setup(conf2, notifyCh) + require.Nil(t, err) + + node2IDUint, err := strconv.ParseUint(node2ID, 10, 64) + require.NoError(t, err) + + // Test Join + err = peer.Join(node2ID, net.JoinHostPort(node2Host, strconv.Itoa(node2Port))) + require.NoError(t, err) + + time.Sleep(2 * time.Second) + + status := peer.node.Status() + + found := false + for _, majorityConfig := range status.Config.Voters { + if _, ok := majorityConfig[node2IDUint]; ok { + found = true + break + } + } + require.True(t, found) + + err = peer.Leave(node2ID) + require.NoError(t, err) + + time.Sleep(2 * time.Second) + + status = peer.node.Status() + + found = false + for _, c := range status.Config.Voters { + t.Log(found) + if _, ok := c[node2IDUint]; ok { + found = true + break + } + } + require.False(t, found) +} + +func TestProposeAndLookup(t *testing.T) { + peer := createTestPeer(t) + defer peer.Stop() + + msg := &message.Message{ + Type: packets.Subscribe, + NodeID: "1", + Payload: []byte("filter"), + } + + err := peer.Propose(msg) + require.NoError(t, err) + + key := "filter" + expectedValue := "1" + + time.Sleep(2 * time.Second) + + result := peer.Lookup(key) + require.Equal(t, []string{expectedValue}, result) +} + +func TestGetLeader(t *testing.T) { + peer := createTestPeer(t) + defer peer.Stop() + + time.Sleep(2 * time.Second) + + _, id := peer.GetLeader() + require.Equal(t, "1", id) +}