forked from DataDog/kafka-kit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
topics.go
131 lines (108 loc) · 3.63 KB
/
topics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package main
import (
"errors"
"fmt"
"strconv"
)
var (
errInvalidReplicaType error = errors.New("invalid replica type")
)
// topicThrottledReplicas is a map of topic names to throttled types.
// This is ultimately populated as follows:
// map[topic]map[leaders]["0:1001", "1:1002"]
// map[topic]map[followers]["2:1003", "3:1004"]
type topicThrottledReplicas map[topic]throttled
// throttled is a replica type (leader, follower) to replica list.
type throttled map[replicaType]brokerIDs
// topic name.
type topic string
// leader, follower.
type replicaType string
// Replica broker IDs as a []string where string == partition_number:broker_id.
type brokerIDs []string
var acceptedReplicaTypes = map[replicaType]struct{}{
"leaders": {},
"followers": {},
}
// topics returns the topic names held in the topicThrottledReplicas.
func (ttr topicThrottledReplicas) topics() []string {
var names []string
for topic := range ttr {
names = append(names, string(topic))
}
return names
}
// addReplica takes a topic, partition number, role (leader, follower), and
// broker ID and adds the configuration to the topicThrottledReplicas.
func (ttr topicThrottledReplicas) addReplica(topic topic, partn string, role replicaType, id string) error {
if _, exist := acceptedReplicaTypes[role]; !exist {
return errInvalidReplicaType
}
// Check / create the topic entry.
if _, exist := ttr[topic]; !exist {
ttr[topic] = make(throttled)
}
// Check / create the leader/follower list.
if ttr[topic][role] == nil {
ttr[topic][role] = []string{}
}
// Form the throttled replica string.
str := fmt.Sprintf("%s:%s", partn, id)
// If the entry is already in the list, return early.
for _, entry := range ttr[topic][role] {
if entry == str {
return nil
}
}
// Otherwise add the entry.
l := ttr[topic][role]
l = append(l, str)
ttr[topic][role] = l
return nil
}
// getTopicsWithThrottledBrokers returns a topicThrottledReplicas that includes
// any topics that have partitions assigned to brokers with a static throttle
// rate set.
func (tm *ThrottleManager) getTopicsWithThrottledBrokers() (topicThrottledReplicas, error) {
if !tm.kafkaNativeMode {
// Use the direct ZooKeeper config update method.
return tm.legacyGetTopicsWithThrottledBrokers()
}
// Lookup brokers with overrides set that are not a reassignment participant.
throttledBrokers := tm.brokerOverrides.Filter(notReassignmentParticipant)
// Construct a topicThrottledReplicas that includes any topics with replicas
// assigned to brokers with overrides. The throttled list only includes brokers
// with throttles set rather than all configured replicas.
var throttleLists = make(topicThrottledReplicas)
ctx, cancelFn := tm.kafkaRequestContext()
defer cancelFn()
// Get topic states.
states, err := tm.ka.DescribeTopics(ctx, []string{".*"})
if err != nil {
return nil, err
}
// For each topic, check the replica assignment for all partitions. If any
// partition has an assigned broker with a static throttle rate set, append it
// to the throttleLists.
for topicName, state := range states {
// TODO(jamie): make this configurable.
if topicName == "__consumer_offsets" {
continue
}
for partition, partitionState := range state.PartitionStates {
for _, brokerID := range partitionState.Replicas {
// Look up the broker in the throttled brokers set.
if _, hasThrottle := throttledBrokers[int(brokerID)]; hasThrottle {
// Add it to the throttleLists.
throttleLists.addReplica(
topic(topicName),
strconv.Itoa(partition),
replicaType("followers"),
strconv.Itoa(int(brokerID)),
)
}
}
}
}
return throttleLists, nil
}