forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
findcoordinator.go
170 lines (137 loc) · 4.43 KB
/
findcoordinator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package kafka
import (
"bufio"
"context"
"fmt"
"net"
"time"
"github.com/Andrew-Zipperer/kafka-go/protocol/findcoordinator"
)
// CoordinatorKeyType is used to specify the type of coordinator to look for
type CoordinatorKeyType int8
const (
// CoordinatorKeyTypeConsumer type is used when looking for a Group coordinator
CoordinatorKeyTypeConsumer CoordinatorKeyType = 0
// CoordinatorKeyTypeTransaction type is used when looking for a Transaction coordinator
CoordinatorKeyTypeTransaction CoordinatorKeyType = 1
)
// FindCoordinatorRequest is the request structure for the FindCoordinator function
type FindCoordinatorRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// The coordinator key.
Key string
// The coordinator key type. (Group, transaction, etc.)
KeyType CoordinatorKeyType
}
// FindCoordinatorResponseCoordinator contains details about the found coordinator
type FindCoordinatorResponseCoordinator struct {
// NodeID holds the broker id.
NodeID int
// Host of the broker
Host string
// Port on which broker accepts requests
Port int
}
// FindCoordinatorResponse is the response structure for the FindCoordinator function
type FindCoordinatorResponse struct {
// The Transaction/Group Coordinator details
Coordinator *FindCoordinatorResponseCoordinator
// The amount of time that the broker throttled the request.
Throttle time.Duration
// An error that may have occurred while attempting to retrieve Coordinator
//
// The error contains both the kafka error code, and an error message
// returned by the kafka broker.
Error error
}
// FindCoordinator sends a findCoordinator request to a kafka broker and returns the
// response.
func (c *Client) FindCoordinator(ctx context.Context, req *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
m, err := c.roundTrip(ctx, req.Addr, &findcoordinator.Request{
Key: req.Key,
KeyType: int8(req.KeyType),
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).FindCoordinator: %w", err)
}
res := m.(*findcoordinator.Response)
coordinator := &FindCoordinatorResponseCoordinator{
NodeID: int(res.NodeID),
Host: res.Host,
Port: int(res.Port),
}
ret := &FindCoordinatorResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Error: makeError(res.ErrorCode, res.ErrorMessage),
Coordinator: coordinator,
}
return ret, nil
}
// FindCoordinatorRequestV0 requests the coordinator for the specified group or transaction
//
// See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
type findCoordinatorRequestV0 struct {
// CoordinatorKey holds id to use for finding the coordinator (for groups, this is
// the groupId, for transactional producers, this is the transactional id)
CoordinatorKey string
}
func (t findCoordinatorRequestV0) size() int32 {
return sizeofString(t.CoordinatorKey)
}
func (t findCoordinatorRequestV0) writeTo(wb *writeBuffer) {
wb.writeString(t.CoordinatorKey)
}
type findCoordinatorResponseCoordinatorV0 struct {
// NodeID holds the broker id.
NodeID int32
// Host of the broker
Host string
// Port on which broker accepts requests
Port int32
}
func (t findCoordinatorResponseCoordinatorV0) size() int32 {
return sizeofInt32(t.NodeID) +
sizeofString(t.Host) +
sizeofInt32(t.Port)
}
func (t findCoordinatorResponseCoordinatorV0) writeTo(wb *writeBuffer) {
wb.writeInt32(t.NodeID)
wb.writeString(t.Host)
wb.writeInt32(t.Port)
}
func (t *findCoordinatorResponseCoordinatorV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readInt32(r, size, &t.NodeID); err != nil {
return
}
if remain, err = readString(r, remain, &t.Host); err != nil {
return
}
if remain, err = readInt32(r, remain, &t.Port); err != nil {
return
}
return
}
type findCoordinatorResponseV0 struct {
// ErrorCode holds response error code
ErrorCode int16
// Coordinator holds host and port information for the coordinator
Coordinator findCoordinatorResponseCoordinatorV0
}
func (t findCoordinatorResponseV0) size() int32 {
return sizeofInt16(t.ErrorCode) +
t.Coordinator.size()
}
func (t findCoordinatorResponseV0) writeTo(wb *writeBuffer) {
wb.writeInt16(t.ErrorCode)
t.Coordinator.writeTo(wb)
}
func (t *findCoordinatorResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
return
}
if remain, err = (&t.Coordinator).readFrom(r, remain); err != nil {
return
}
return
}