Skip to content

Commit f3cca64

Browse files
author
Lakshay Kalbhor
committed
fix: pass topics inside target struct
1 parent e6514f4 commit f3cca64

File tree

2 files changed

+7
-5
lines changed

2 files changed

+7
-5
lines changed

internal/relay/source_pool.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ func (sp *SourcePool) SetInitialOffsets(of map[string]map[int32]kgo.Offset) {
126126
// Set the current candidate with initial weight and a placeholder ID. This initial
127127
// weight ensures we resume consuming from where last left off. A real
128128
// healthy node should replace this via background checks
129+
sp.log.Debug("setting initial target node weight", "weight", w)
129130
sp.curCandidate = Server{
130131
Healthy: false,
131132
Weight: w,

internal/relay/target.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,12 @@ type Target struct {
4141
// NewTarget returns a new producer relay that handles target Kafka instances.
4242
func NewTarget(globalCtx context.Context, cfg TargetCfg, pCfg ProducerCfg, topics Topics, m *metrics.Set, log *slog.Logger) (*Target, error) {
4343
p := &Target{
44-
cfg: cfg,
45-
pCfg: pCfg,
46-
ctx: globalCtx,
47-
metrics: m,
48-
log: log,
44+
cfg: cfg,
45+
pCfg: pCfg,
46+
ctx: globalCtx,
47+
metrics: m,
48+
log: log,
49+
targetTopics: topics,
4950

5051
batch: make([]*kgo.Record, 0, pCfg.BatchSize),
5152
batchCh: make(chan *kgo.Record),

0 commit comments

Comments
 (0)