Skip to content

Commit ba4d0d8

Browse files
authored
Merge pull request #13 from zerodha/develop
Merge develop to master
2 parents a46ec5d + eac33a4 commit ba4d0d8

File tree

7 files changed

+28
-22
lines changed

7 files changed

+28
-22
lines changed

README.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
<a href="https://zerodha.tech"><img src="https://zerodha.tech/static/images/github-badge.svg" align="right" /></a>
2-
## Relay
2+
## kaf-relay
33

4-
Relay is an opinionated program designed to replicate messages on topics from one Kafka cluster to another Kafka cluster.
4+
kaf-relay is an opinionated, high performance program for keeping Kafka clusters in sync by replicating topics. It is specfically designed for high-availability with background healthchecks, offset tracking, and topic lag checks.
55

66
### Features
77

88
* Topic Forwarding: Relay consumes messages from topics in one Kafka cluster and forwards them to topics in another Kafka cluster.
99
* Authentication: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
1010
* Topic Remapping: Relay allows you to specify topic remappings, enabling you to map a topic from the source cluster to a different topic in the destination cluster.
11-
* Consumer group failover: Assuming we have multiple identical kafkas (separate nodes 1...N) at the upstream side, this mode allows us to fallback to the next kafka in a round-robin fashion if current broker goes down. This allows us to deduplicate messages downstream without using any external stores.
12-
* Topic lag failover: In addition to consumer group failover, if there is a lag between node1 and node2, we initiate an immediate switch-over to node2.
11+
* Consumer group failover: Given identical Kafka instances (separate nodes 1...N) at the upstream, instantly switch over to the next node in a round-robin fashion on current node failure. Offset tracking between source and target nodes allows de-duplication without external stores.
12+
* Topic lag failover: Monitors offsets amongst N identical nodes to detect lags and to instantly switch upstream consumer nodes.
1313
* Stop at end: Flag `--stop-at-end` allows the program to stop after reaching the end of consumer topic offsets that was picked up on boot.
1414
* Filter messages using go plugins: Flag `--filter` allows the program to filter messages based on the logic in plugin code.
1515

16-
#### relay in different modes
16+
#### kaf-relay in different modes
1717

1818
![image](./screenshots/relay.png)
1919

@@ -25,7 +25,7 @@ Relay is an opinionated program designed to replicate messages on topics from on
2525
## Installation
2626

2727
```bash
28-
git clone https://github.com/joeirimpan/kaf-relay.git
28+
git clone https://github.com/zerodha/kaf-relay.git
2929
cd kaf-relay
3030
make dist
3131
```
@@ -92,4 +92,4 @@ kafka_relay_msg_count{source="topicA", destination="machineX_topicA", partition=
9292
kafka_relay_msg_count{source="topicA", destination="machineX_topicA", partition="3"} 44
9393
kafka_relay_msg_count{source="topicA", destination="machineX_topicA", partition="4"} 44
9494
kafka_relay_msg_count{source="topicA", destination="machineX_topicA", partition="5"} 100
95-
```
95+
```

config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ type ConsumerGroupCfg struct {
7373
ClientCfg `koanf:",squash"`
7474

7575
GroupID string `koanf:"group_id"`
76+
InstanceID string `koanf:"instance_id"`
7677
MaxWaitTime time.Duration `koanf:"max_wait_time"`
7778

7879
MaxFailovers int `koanf:"max_failovers"`

config.toml.sample

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ max_wait_time = "10ms"
3434
offset_commit_interval = "100ms"
3535
initial_offset = "start"
3636
group_id = "relay_group1"
37+
instance_id = "kaf_relay"
3738

3839
max_failovers = -1 # infinite
3940

@@ -57,6 +58,7 @@ max_wait_time = "10ms"
5758
offset_commit_interval = "100ms"
5859
initial_offset = "start"
5960
group_id = "relay_group2"
61+
instance_id = "kaf_relay"
6062

6163
max_failovers = -1 # infinite
6264

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module github.com/joeirimpan/kaf-relay
1+
module github.com/zerodha/kaf-relay
22

33
go 1.21
44

initz.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,9 @@ import (
1010
"strings"
1111

1212
"github.com/VictoriaMetrics/metrics"
13-
"github.com/joeirimpan/kaf-relay/filter"
1413
"github.com/knadh/koanf/v2"
1514
"github.com/twmb/franz-go/pkg/kgo"
16-
)
17-
18-
var (
19-
// instanceID is for the static consumer group memeber.
20-
instanceID = "kaf-relay"
15+
"github.com/zerodha/kaf-relay/filter"
2116
)
2217

2318
// getProducerClient returns a kafka producer client.
@@ -154,7 +149,7 @@ func initConsumerGroup(ctx context.Context, cfg ConsumerGroupCfg, l *slog.Logger
154149
kgo.FetchMaxWait(cfg.MaxWaitTime),
155150
kgo.ConsumeTopics(cfg.Topics...),
156151
kgo.ConsumerGroup(cfg.GroupID),
157-
kgo.InstanceID(instanceID),
152+
kgo.InstanceID(cfg.InstanceID),
158153
kgo.SessionTimeout(cfg.SessionTimeout),
159154
kgo.DisableAutoCommit(),
160155
kgo.OnPartitionsAssigned(onAssigned),

relay.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ import (
99
"time"
1010

1111
"github.com/VictoriaMetrics/metrics"
12-
"github.com/joeirimpan/kaf-relay/filter"
1312
"github.com/twmb/franz-go/pkg/kadm"
1413
"github.com/twmb/franz-go/pkg/kgo"
14+
"github.com/zerodha/kaf-relay/filter"
1515
)
1616

1717
// relay represents the input, output kafka and the remapping necessary to forward messages from one topic to another.

utils.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func appendSASL(opts []kgo.Opt, cfg ClientCfg) []kgo.Opt {
185185
// leaveAndResetOffsets leaves the current consumer group and resets its offset if given.
186186
func leaveAndResetOffsets(ctx context.Context, cl *kgo.Client, cfg ConsumerGroupCfg, offsets map[string]map[int32]kgo.Offset, l *slog.Logger) error {
187187
// leave group; mark the group as `Empty` before attempting to reset offsets.
188-
l.Debug("leaving group", "group_id", cfg.GroupID)
188+
l.Debug("leaving group", "group_id", cfg.GroupID, "instance_id", cfg.InstanceID)
189189
if err := leaveGroup(ctx, cl, cfg); err != nil {
190190
return err
191191
}
@@ -203,12 +203,20 @@ func leaveAndResetOffsets(ctx context.Context, cl *kgo.Client, cfg ConsumerGroup
203203

204204
// leaveGroup leaves the consumer group with our instance id
205205
func leaveGroup(ctx context.Context, cl *kgo.Client, cfg ConsumerGroupCfg) error {
206-
req := kmsg.NewPtrLeaveGroupRequest()
207-
req.Group = cfg.GroupID
206+
l := kadm.LeaveGroup(cfg.GroupID).
207+
Reason("resetting offsets").
208+
InstanceIDs(cfg.InstanceID)
208209

209-
req.Members = []kmsg.LeaveGroupRequestMember{{InstanceID: &instanceID}}
210-
_, err := req.RequestWith(ctx, cl)
211-
return err
210+
resp, err := kadm.NewClient(cl).LeaveGroup(ctx, l)
211+
if err != nil {
212+
return err
213+
}
214+
215+
if err := resp.Error(); err != nil {
216+
return err
217+
}
218+
219+
return nil
212220
}
213221

214222
// resetOffsets resets the consumer group with the given offsets map.

0 commit comments

Comments
 (0)