-
-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathgrpcapi.go
146 lines (129 loc) · 3.79 KB
/
grpcapi.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package transport
import (
"context"
"io"
pb "github.com/Jille/raft-grpc-transport/proto"
"github.com/hashicorp/raft"
)
// These are requests incoming over gRPC that we need to relay to the Raft engine.
type gRPCAPI struct {
manager *Manager
// "Unsafe" to ensure compilation fails if new methods are added but not implemented
pb.UnsafeRaftTransportServer
}
func (g gRPCAPI) handleRPC(command interface{}, data io.Reader) (interface{}, error) {
ch := make(chan raft.RPCResponse, 1)
rpc := raft.RPC{
Command: command,
RespChan: ch,
Reader: data,
}
if isHeartbeat(command) {
// We can take the fast path and use the heartbeat callback and skip the queue in g.manager.rpcChan.
g.manager.heartbeatFuncMtx.Lock()
fn := g.manager.heartbeatFunc
g.manager.heartbeatFuncMtx.Unlock()
if fn != nil {
fn(rpc)
goto wait
}
}
select {
case g.manager.rpcChan <- rpc:
case <-g.manager.shutdownCh:
return nil, raft.ErrTransportShutdown
}
wait:
select {
case resp := <-ch:
if resp.Error != nil {
return nil, resp.Error
}
return resp.Response, nil
case <-g.manager.shutdownCh:
return nil, raft.ErrTransportShutdown
}
}
func (g gRPCAPI) AppendEntries(ctx context.Context, req *pb.AppendEntriesRequest) (*pb.AppendEntriesResponse, error) {
resp, err := g.handleRPC(decodeAppendEntriesRequest(req), nil)
if err != nil {
return nil, err
}
return encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse)), nil
}
func (g gRPCAPI) RequestVote(ctx context.Context, req *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error) {
resp, err := g.handleRPC(decodeRequestVoteRequest(req), nil)
if err != nil {
return nil, err
}
return encodeRequestVoteResponse(resp.(*raft.RequestVoteResponse)), nil
}
func (g gRPCAPI) TimeoutNow(ctx context.Context, req *pb.TimeoutNowRequest) (*pb.TimeoutNowResponse, error) {
resp, err := g.handleRPC(decodeTimeoutNowRequest(req), nil)
if err != nil {
return nil, err
}
return encodeTimeoutNowResponse(resp.(*raft.TimeoutNowResponse)), nil
}
func (g gRPCAPI) RequestPreVote(ctx context.Context, req *pb.RequestPreVoteRequest) (*pb.RequestPreVoteResponse, error) {
resp, err := g.handleRPC(decodeRequestPreVoteRequest(req), nil)
if err != nil {
return nil, err
}
return encodeRequestPreVoteResponse(resp.(*raft.RequestPreVoteResponse)), nil
}
func (g gRPCAPI) InstallSnapshot(s pb.RaftTransport_InstallSnapshotServer) error {
isr, err := s.Recv()
if err != nil {
return err
}
resp, err := g.handleRPC(decodeInstallSnapshotRequest(isr), &snapshotStream{s, isr.GetData()})
if err != nil {
return err
}
return s.SendAndClose(encodeInstallSnapshotResponse(resp.(*raft.InstallSnapshotResponse)))
}
type snapshotStream struct {
s pb.RaftTransport_InstallSnapshotServer
buf []byte
}
func (s *snapshotStream) Read(b []byte) (int, error) {
if len(s.buf) > 0 {
n := copy(b, s.buf)
s.buf = s.buf[n:]
return n, nil
}
m, err := s.s.Recv()
if err != nil {
return 0, err
}
n := copy(b, m.GetData())
if n < len(m.GetData()) {
s.buf = m.GetData()[n:]
}
return n, nil
}
func (g gRPCAPI) AppendEntriesPipeline(s pb.RaftTransport_AppendEntriesPipelineServer) error {
for {
msg, err := s.Recv()
if err != nil {
return err
}
resp, err := g.handleRPC(decodeAppendEntriesRequest(msg), nil)
if err != nil {
// TODO(quis): One failure doesn't have to break the entire stream?
// Or does it all go wrong when it's out of order anyway?
return err
}
if err := s.Send(encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse))); err != nil {
return err
}
}
}
func isHeartbeat(command interface{}) bool {
req, ok := command.(*raft.AppendEntriesRequest)
if !ok {
return false
}
return req.Term != 0 && len(req.Leader) != 0 && req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && len(req.Entries) == 0 && req.LeaderCommitIndex == 0
}