Skip to content

Commit 311a680

Browse files
author
Lakshay Kalbhor
committed
fix: batch channel should be buffered
1 parent f3cca64 commit 311a680

File tree

2 files changed

+21
-14
lines changed

2 files changed

+21
-14
lines changed

internal/relay/relay.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -244,17 +244,21 @@ func (re *Relay) processMessage(ctx context.Context, rec *kgo.Record) error {
244244
return nil
245245
}
246246

247-
// Queue the message for writing to target.
248-
select {
249-
case <-ctx.Done():
250-
return ctx.Err()
251-
252-
case re.target.GetBatchCh() <- &kgo.Record{
247+
msg := &kgo.Record{
253248
Key: rec.Key,
254249
Value: rec.Value,
255250
Topic: t.TargetTopic,
256251
Partition: rec.Partition,
257-
}:
252+
}
253+
254+
// Queue the message for writing to target.
255+
select {
256+
case <-ctx.Done():
257+
return ctx.Err()
258+
case re.target.GetBatchCh() <- msg:
259+
default:
260+
re.log.Error("target inlet channel blocked")
261+
re.target.GetBatchCh() <- msg
258262
}
259263

260264
return nil

internal/relay/target.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@ type Target struct {
3434
// Map of target topics and their config.
3535
targetTopics Topics
3636

37-
batchCh chan *kgo.Record
38-
batch []*kgo.Record
37+
// Inlet receives relayed messages into target for batching
38+
inletCh chan *kgo.Record
39+
40+
// Holds the active batch that is produced to destination topic
41+
batch []*kgo.Record
3942
}
4043

4144
// NewTarget returns a new producer relay that handles target Kafka instances.
@@ -49,7 +52,7 @@ func NewTarget(globalCtx context.Context, cfg TargetCfg, pCfg ProducerCfg, topic
4952
targetTopics: topics,
5053

5154
batch: make([]*kgo.Record, 0, pCfg.BatchSize),
52-
batchCh: make(chan *kgo.Record),
55+
inletCh: make(chan *kgo.Record, pCfg.BatchSize*10),
5356
}
5457

5558
// Initialize the actual Kafka client.
@@ -73,12 +76,12 @@ func (tg *Target) Close() {
7376

7477
// CloseBatchCh closes the Producer batch channel.
7578
func (tg *Target) CloseBatchCh() {
76-
close(tg.batchCh)
79+
close(tg.inletCh)
7780
}
7881

7982
// GetBatchCh returns the Producer batch channel.
8083
func (tg *Target) GetBatchCh() chan *kgo.Record {
81-
return tg.batchCh
84+
return tg.inletCh
8285
}
8386

8487
// prepareRecord checks if custom topic partition mapping is defined.
@@ -111,7 +114,7 @@ func (tg *Target) Start(ctx context.Context) error {
111114
return ctx.Err()
112115

113116
// Queue the message to and flush if the batch size is reached.
114-
case msg, ok := <-tg.batchCh:
117+
case msg, ok := <-tg.inletCh:
115118
if !ok {
116119
// Flush and cleanup on exit.
117120
if err := tg.drain(); err != nil {
@@ -246,7 +249,7 @@ outerLoop:
246249
// drain drains and flushes any pending messages in the producer.
247250
func (tg *Target) drain() error {
248251
now := time.Now()
249-
for rec := range tg.batchCh {
252+
for rec := range tg.inletCh {
250253
tg.prepareRecord(rec)
251254
tg.batch = append(tg.batch, rec)
252255
}

0 commit comments

Comments
 (0)