Skip to content

Commit

Permalink
inital hashing logic added to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
maurafortino committed Oct 4, 2024
1 parent 1685347 commit 604dd83
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 31 deletions.
23 changes: 0 additions & 23 deletions cmd/main.go

This file was deleted.

81 changes: 81 additions & 0 deletions internal/sink/hasher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC
// SPDX-License-Identifier: Apache-2.0
package sink

import (
"fmt"
"hash/crc32"
"reflect"
"sort"

"github.com/xmidt-org/wrp-go/v3"
)

type Node struct {
hash int
sink string
}

type HashRing []Node

func (h HashRing) Len() int {
return len(h)
}
func (h HashRing) Less(i, j int) bool {
return h[i].hash < h[j].hash
}
func (h HashRing) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h HashRing) GetServer(key string) string {
if len(h) == 0 {
return ""
}
hash := int(crc32.ChecksumIEEE([]byte(key)))
idx := sort.Search(len(h), func(i int) bool {
return h[i].hash >= hash
})
if idx == len(h) {
idx = 0
}
return h[idx].sink
}

func (h *HashRing) AddServer(server string) {
hash := int(crc32.ChecksumIEEE([]byte(server)))
node := Node{hash: hash, sink: server}
*h = append(*h, node)
sort.Sort(h)
}

func (h *HashRing) RemoveServer(server string) {
hash := int(crc32.ChecksumIEEE([]byte(server)))
for i, node := range *h {
if node.hash == hash {
*h = append((*h)[:i], (*h)[i+1:]...)
break
}
}
sort.Sort(h)
}

func NewRing() *HashRing {
return &HashRing{}
}

func GetKey(field string, msg *wrp.Message) string {

v := reflect.ValueOf(msg)
if v.Kind() == reflect.Ptr {
v = v.Elem() // Dereference pointer if necessary
}

value := v.FieldByName(field)
if value.IsValid() {
return fmt.Sprintf("%v", value.Interface())
}

return ""

}
33 changes: 25 additions & 8 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,18 @@ type WebhookV1 struct {
type Webhooks []*WebhookV1
type CommonWebhook struct {
id string
hashField string
failureUrl string
deliveryInterval time.Duration
deliveryRetries int
mutex sync.RWMutex
logger *zap.Logger
}
type Kafkas []*Kafka
type KafkaSink struct {
Kafkas []*Kafka
Hash *HashRing
HashField string
}
type Kafka struct {
brokerAddr []string
topic string
Expand All @@ -76,11 +81,15 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink {
return whs
}
if len(l.Registration.Kafkas) > 0 {
var sink Kafkas
for _, k := range l.Registration.Kafkas {
var sink KafkaSink
r := NewRing()
sink.HashField = l.Registration.Hash.Field
for i, k := range l.Registration.Kafkas {
kafka := &Kafka{}
kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger)
sink = append(sink, kafka)
sink.Kafkas = append(sink.Kafkas, kafka)
key := l.Registration.Hash.Field + strconv.Itoa(i)
r.AddServer(key)
}
return sink
}
Expand All @@ -91,8 +100,7 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink {
}

func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) (err error) {
//TODO: is there anything else that needs to be done for this?
//do we need to return an error
//TODO: do we need to return an error if not - we should get rid of the error return
v1.id = id
v1.failureUrl = failureUrl
v1.deliveryInterval = c.DeliveryInterval
Expand Down Expand Up @@ -338,11 +346,20 @@ func (k *Kafka) Update(id, topic string, retries int, servers []string, logger *
return nil
}

func (k Kafkas) Send(secret string, acceptType string, msg *wrp.Message) error {
func (k KafkaSink) Send(secret string, acceptType string, msg *wrp.Message) error {
key := GetKey(k.HashField, msg)
for i, kafka := range k.Kafkas {
hash := k.HashField + strconv.Itoa(i)
server := k.Hash.GetServer(key)
if server == hash {
_ = kafka.send(secret, acceptType, msg)
}

}
//TODO: discuss with wes and john the default hashing logic
//for now: when no hash is given we will just loop through all the kafkas
var errs error
for _, kafka := range k {
for _, kafka := range k.Kafkas {
err := kafka.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
Expand Down

0 comments on commit 604dd83

Please sign in to comment.