From 33fa97dff2acdd524904f6ad946e824895473bb5 Mon Sep 17 00:00:00 2001 From: Deepanshu Mehra Date: Mon, 25 Nov 2024 10:38:11 +0530 Subject: [PATCH] feat : added logic for consuming events --- listener-service/event/consumer.go | 130 +++++++++++++++++++++++++++++ listener-service/event/event.go | 13 +++ listener-service/main.go | 45 ++++++++++ 3 files changed, 188 insertions(+) create mode 100644 listener-service/event/consumer.go create mode 100644 listener-service/event/event.go diff --git a/listener-service/event/consumer.go b/listener-service/event/consumer.go new file mode 100644 index 0000000..ff827b3 --- /dev/null +++ b/listener-service/event/consumer.go @@ -0,0 +1,130 @@ +package event + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net/http" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type Consumer struct { + conn *amqp.Connection + queueName string +} + +func NewConsumer(conn *amqp.Connection) (Consumer, error) { + consumer := Consumer{ + conn: conn, + } + + err := consumer.setup() + if err != nil { + return Consumer{}, err + } + + return consumer, nil +} + +func (consumer *Consumer) setup() error { + channel, err := consumer.conn.Channel() + if err != nil { + return err + } + + return declareExchange(channel) +} + +type Payload struct { + Name string `json:"name"` + Data string `json:"data"` +} + +func (consumer *Consumer) Listen(topics []string) error { + ch, err := consumer.conn.Channel() + if err != nil { + return err + } + + defer ch.Close() + + q, err := declareRandomQueue(ch) + if err != nil { + return err + } + + for _, s := range topics { + ch.QueueBind(q.Name, s, "logs_topic", false, nil) + } + + messages, err := ch.Consume(q.Name, "", true, false, false, false, nil) + if err != nil { + return err + } + + forever := make(chan bool) + go func() { + for d := range messages { + var payload Payload + _ = json.Unmarshal(d.Body, &payload) + + go handlePayload(payload) + } + }() + + fmt.Printf("Waiting for message [Exchange, Queue] [logs_topic, %s]\n", q.Name) + <-forever + + return nil +} + +func handlePayload(payload Payload) { + switch payload.Name { + case "log", "event": + // log whatever we get + err := logEvent(payload) + if err != nil { + log.Println(err) + } + + case "auth": + // authenticate + + // you can have as many cases as you want, as long as you write the logic + + default: + err := logEvent(payload) + if err != nil { + log.Println(err) + } + } +} + +func logEvent(entry Payload) error { + jsonData, _ := json.MarshalIndent(entry, "", "\t") + + logServiceURL := "http://logger-service:8080/log" + + request, err := http.NewRequest("POST", logServiceURL, bytes.NewBuffer(jsonData)) + if err != nil { + return err + } + + request.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + + response, err := client.Do(request) + if err != nil { + return err + } + defer response.Body.Close() + + if response.StatusCode != http.StatusAccepted { + return err + } + + return nil +} diff --git a/listener-service/event/event.go b/listener-service/event/event.go new file mode 100644 index 0000000..c0feec9 --- /dev/null +++ b/listener-service/event/event.go @@ -0,0 +1,13 @@ +package event + +import ( + amqp "github.com/rabbitmq/amqp091-go" +) + +func declareExchange(ch *amqp.Channel) error { + return ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil) +} + +func declareRandomQueue(ch *amqp.Channel) (amqp.Queue, error) { + return ch.QueueDeclare("", false, false, true, false, nil) +} diff --git a/listener-service/main.go b/listener-service/main.go index 7905807..d02dc93 100644 --- a/listener-service/main.go +++ b/listener-service/main.go @@ -1,5 +1,50 @@ package main +import ( + "fmt" + "log" + "math" + "os" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + func main() { + rabbitConn, err := connect() + if err != nil { + log.Println(err) + os.Exit(1) + } + + defer rabbitConn.Close() + log.Println("Connected to RabbitMQ") +} + +func connect() (*amqp.Connection, error) { + var counts int64 + var backOff = 1 * time.Second + var connection *amqp.Connection + + for { + conn, err := amqp.Dial("amqp://guest:guest@localhost") + if err != nil { + fmt.Println("RabbitMQ not yet ready...") + counts++ + } else { + connection = conn + break + } + + if counts > 5 { + fmt.Println(err) + return nil, err + } + + backOff = time.Duration(math.Pow(float64(counts), 2)) * time.Second + log.Println("backing off...") + time.Sleep(backOff) + } + return connection, nil }