Skip to content

Commit

Permalink
fixed panic: added hashring to sink
Browse files Browse the repository at this point in the history
  • Loading branch information
maurafortino committed Oct 4, 2024
1 parent 604dd83 commit 7f7060c
Showing 1 changed file with 26 additions and 18 deletions.
44 changes: 26 additions & 18 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type WebhookV1 struct {
type Webhooks []*WebhookV1
type CommonWebhook struct {
id string
hashField string
failureUrl string
deliveryInterval time.Duration
deliveryRetries int
Expand Down Expand Up @@ -88,9 +87,12 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink {
kafka := &Kafka{}
kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger)
sink.Kafkas = append(sink.Kafkas, kafka)
key := l.Registration.Hash.Field + strconv.Itoa(i)
r.AddServer(key)
if l.Registration.Hash.Field != "" {
key := l.Registration.Hash.Field + strconv.Itoa(i)
r.AddServer(key)
}
}
sink.Hash = r
return sink
}
default:
Expand Down Expand Up @@ -347,22 +349,28 @@ func (k *Kafka) Update(id, topic string, retries int, servers []string, logger *
}

func (k KafkaSink) Send(secret string, acceptType string, msg *wrp.Message) error {
key := GetKey(k.HashField, msg)
for i, kafka := range k.Kafkas {
hash := k.HashField + strconv.Itoa(i)
server := k.Hash.GetServer(key)
if server == hash {
_ = kafka.send(secret, acceptType, msg)
}

}
//TODO: discuss with wes and john the default hashing logic
//for now: when no hash is given we will just loop through all the kafkas
var errs error
for _, kafka := range k.Kafkas {
err := kafka.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
if k.HashField != "" {
key := GetKey(k.HashField, msg)
for i, kafka := range k.Kafkas {
hash := k.HashField + strconv.Itoa(i)
server := k.Hash.GetServer(key)
if server == hash {
err := kafka.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}

}
} else {
//TODO: discuss with wes and john the default hashing logic
//for now: when no hash is given we will just loop through all the kafkas
for _, kafka := range k.Kafkas {
err := kafka.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}
}
return errs
Expand Down

0 comments on commit 7f7060c

Please sign in to comment.