@@ -229,7 +229,7 @@ func (sp *SourcePool) Close() {
229
229
230
230
// newConn initializes a new consumer group config.
231
231
func (sp * SourcePool ) newConn (ctx context.Context , s Server ) (* kgo.Client , error ) {
232
- sp .log .Debug ("running TCP health check" , "id" , s .ID , "server" , s .Config .BootstrapBrokers )
232
+ sp .log .Debug ("running TCP health check" , "id" , s .ID , "server" , s .Config .BootstrapBrokers , "session_timeout" , s . Config . SessionTimeout )
233
233
if ok := checkTCP (ctx , s .Config .BootstrapBrokers , s .Config .SessionTimeout ); ! ok {
234
234
return nil , ErrorNoHealthy
235
235
}
@@ -297,12 +297,14 @@ func (sp *SourcePool) healthcheck(ctx context.Context, signal chan struct{}) err
297
297
298
298
// For the first ever check, clients will be nil.
299
299
if clients [i ] == nil {
300
+ sp .log .Debug ("initializing admin client for background check" , "id" , s .ID , "server" , s .Config .BootstrapBrokers )
300
301
cl , err := sp .initConsumerClient (s .Config )
301
302
if err != nil {
302
303
sp .log .Error ("error initializing admin client in background healthcheck" , "id" , s .ID , "server" , s .Config .BootstrapBrokers , "error" , err )
303
304
continue
304
305
}
305
306
307
+ sp .log .Debug ("initialized admin client for background check" , "id" , s .ID , "server" , s .Config .BootstrapBrokers )
306
308
clients [i ] = cl
307
309
}
308
310
@@ -314,6 +316,7 @@ func (sp *SourcePool) healthcheck(ctx context.Context, signal chan struct{}) err
314
316
315
317
// Get the highest offset of all the topics on the source server and sum them up
316
318
// to derive the weight of the server.
319
+ sp .log .Debug ("getting high watermark via admin client for background check" , "id" , idx )
317
320
offsets , err := sp .GetHighWatermark (ctx , clients [idx ])
318
321
if err != nil && offsets == nil {
319
322
sp .log .Error ("error fetching offset in background healthcheck" , "id" , s .ID , "server" , s .Config .BootstrapBrokers , "error" , err )
@@ -506,6 +509,7 @@ func (sp *SourcePool) setWeight(id int, weight int64) {
506
509
sp .curCandidate = s
507
510
}
508
511
512
+ sp .log .Debug ("setting candidate weight" , "id" , id , "weight" , weight , "curr" , sp .curCandidate )
509
513
sp .servers [id ] = s
510
514
break
511
515
}
0 commit comments