@@ -68,11 +68,13 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink {
68
68
id : l .Registration .CanonicalName ,
69
69
brokerAddr : k .BootstrapServers ,
70
70
topic : "quickstart-events" ,
71
+ logger : logger ,
71
72
}
72
73
73
74
config := sarama .NewConfig ()
74
75
//TODO: this is basic set up for now - will need to add more options to config
75
76
//once we know what we are allowing users to send
77
+
76
78
config .Producer .Return .Successes = true
77
79
config .Producer .RequiredAcks = sarama .WaitForAll
78
80
config .Producer .Retry .Max = c .DeliveryRetries //should we be using retryhint for this?
@@ -344,9 +346,9 @@ func (k *Kafka) send(secret string, acceptType string, msg *wrp.Message) error {
344
346
345
347
// Send the message to Kafka
346
348
partition , offset , err := k .producer .SendMessage (kafkaMsg )
349
+ defer k .producer .Close ()
347
350
if err != nil {
348
351
k .logger .Error ("Failed to send message to Kafka" , zap .Error (err ))
349
- k .producer .Close ()
350
352
return err
351
353
}
352
354
@@ -356,7 +358,6 @@ func (k *Kafka) send(secret string, acceptType string, msg *wrp.Message) error {
356
358
zap .Int32 ("Partition" , partition ),
357
359
zap .Int64 ("Offset" , offset ),
358
360
)
359
- k .producer .Close ()
360
361
361
362
return nil
362
363
0 commit comments