diff --git a/cmd/main.go b/cmd/main.go deleted file mode 100644 index c30113a3..00000000 --- a/cmd/main.go +++ /dev/null @@ -1,23 +0,0 @@ -// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 - -package main - -import ( - "fmt" - "os" - - "github.com/xmidt-org/caduceus" -) - -func main() { - - err := caduceus.Caduceus(os.Args[1:], true) - - if err == nil { - return - } - - fmt.Fprintln(os.Stderr, err) - os.Exit(-1) -} diff --git a/internal/sink/hasher.go b/internal/sink/hasher.go new file mode 100644 index 00000000..6a1dc0ec --- /dev/null +++ b/internal/sink/hasher.go @@ -0,0 +1,81 @@ +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 +package sink + +import ( + "fmt" + "hash/crc32" + "reflect" + "sort" + + "github.com/xmidt-org/wrp-go/v3" +) + +type Node struct { + hash int + sink string +} + +type HashRing []Node + +func (h HashRing) Len() int { + return len(h) +} +func (h HashRing) Less(i, j int) bool { + return h[i].hash < h[j].hash +} +func (h HashRing) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h HashRing) GetServer(key string) string { + if len(h) == 0 { + return "" + } + hash := int(crc32.ChecksumIEEE([]byte(key))) + idx := sort.Search(len(h), func(i int) bool { + return h[i].hash >= hash + }) + if idx == len(h) { + idx = 0 + } + return h[idx].sink +} + +func (h *HashRing) AddServer(server string) { + hash := int(crc32.ChecksumIEEE([]byte(server))) + node := Node{hash: hash, sink: server} + *h = append(*h, node) + sort.Sort(h) +} + +func (h *HashRing) RemoveServer(server string) { + hash := int(crc32.ChecksumIEEE([]byte(server))) + for i, node := range *h { + if node.hash == hash { + *h = append((*h)[:i], (*h)[i+1:]...) + break + } + } + sort.Sort(h) +} + +func NewRing() *HashRing { + return &HashRing{} +} + +func GetKey(field string, msg *wrp.Message) string { + + v := reflect.ValueOf(msg) + if v.Kind() == reflect.Ptr { + v = v.Elem() // Dereference pointer if necessary + } + + value := v.FieldByName(field) + if value.IsValid() { + return fmt.Sprintf("%v", value.Interface()) + } + + return "" + +} diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 373aa6ee..294d50ac 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -44,13 +44,18 @@ type WebhookV1 struct { type Webhooks []*WebhookV1 type CommonWebhook struct { id string + hashField string failureUrl string deliveryInterval time.Duration deliveryRetries int mutex sync.RWMutex logger *zap.Logger } -type Kafkas []*Kafka +type KafkaSink struct { + Kafkas []*Kafka + Hash *HashRing + HashField string +} type Kafka struct { brokerAddr []string topic string @@ -76,11 +81,15 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { return whs } if len(l.Registration.Kafkas) > 0 { - var sink Kafkas - for _, k := range l.Registration.Kafkas { + var sink KafkaSink + r := NewRing() + sink.HashField = l.Registration.Hash.Field + for i, k := range l.Registration.Kafkas { kafka := &Kafka{} kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) - sink = append(sink, kafka) + sink.Kafkas = append(sink.Kafkas, kafka) + key := l.Registration.Hash.Field + strconv.Itoa(i) + r.AddServer(key) } return sink } @@ -91,8 +100,7 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { } func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) (err error) { - //TODO: is there anything else that needs to be done for this? - //do we need to return an error + //TODO: do we need to return an error if not - we should get rid of the error return v1.id = id v1.failureUrl = failureUrl v1.deliveryInterval = c.DeliveryInterval @@ -338,11 +346,20 @@ func (k *Kafka) Update(id, topic string, retries int, servers []string, logger * return nil } -func (k Kafkas) Send(secret string, acceptType string, msg *wrp.Message) error { +func (k KafkaSink) Send(secret string, acceptType string, msg *wrp.Message) error { + key := GetKey(k.HashField, msg) + for i, kafka := range k.Kafkas { + hash := k.HashField + strconv.Itoa(i) + server := k.Hash.GetServer(key) + if server == hash { + _ = kafka.send(secret, acceptType, msg) + } + + } //TODO: discuss with wes and john the default hashing logic //for now: when no hash is given we will just loop through all the kafkas var errs error - for _, kafka := range k { + for _, kafka := range k.Kafkas { err := kafka.send(secret, acceptType, msg) if err != nil { errs = errors.Join(errs, err)