Skip to content

Commit

Permalink
feat : added logic for consuming events
Browse files Browse the repository at this point in the history
  • Loading branch information
dmehra2102 committed Nov 25, 2024
1 parent 18d61ca commit 33fa97d
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 0 deletions.
130 changes: 130 additions & 0 deletions listener-service/event/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package event

import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"

amqp "github.com/rabbitmq/amqp091-go"
)

type Consumer struct {
conn *amqp.Connection
queueName string
}

func NewConsumer(conn *amqp.Connection) (Consumer, error) {
consumer := Consumer{
conn: conn,
}

err := consumer.setup()
if err != nil {
return Consumer{}, err
}

return consumer, nil
}

func (consumer *Consumer) setup() error {
channel, err := consumer.conn.Channel()
if err != nil {
return err
}

return declareExchange(channel)
}

type Payload struct {
Name string `json:"name"`
Data string `json:"data"`
}

func (consumer *Consumer) Listen(topics []string) error {
ch, err := consumer.conn.Channel()
if err != nil {
return err
}

defer ch.Close()

q, err := declareRandomQueue(ch)
if err != nil {
return err
}

for _, s := range topics {
ch.QueueBind(q.Name, s, "logs_topic", false, nil)
}

messages, err := ch.Consume(q.Name, "", true, false, false, false, nil)
if err != nil {
return err
}

forever := make(chan bool)
go func() {
for d := range messages {
var payload Payload
_ = json.Unmarshal(d.Body, &payload)

go handlePayload(payload)
}
}()

fmt.Printf("Waiting for message [Exchange, Queue] [logs_topic, %s]\n", q.Name)
<-forever

return nil
}

func handlePayload(payload Payload) {
switch payload.Name {
case "log", "event":
// log whatever we get
err := logEvent(payload)
if err != nil {
log.Println(err)
}

case "auth":
// authenticate

// you can have as many cases as you want, as long as you write the logic

default:
err := logEvent(payload)
if err != nil {
log.Println(err)
}
}
}

func logEvent(entry Payload) error {
jsonData, _ := json.MarshalIndent(entry, "", "\t")

logServiceURL := "http://logger-service:8080/log"

request, err := http.NewRequest("POST", logServiceURL, bytes.NewBuffer(jsonData))
if err != nil {
return err
}

request.Header.Set("Content-Type", "application/json")

client := &http.Client{}

response, err := client.Do(request)
if err != nil {
return err
}
defer response.Body.Close()

if response.StatusCode != http.StatusAccepted {
return err
}

return nil
}
13 changes: 13 additions & 0 deletions listener-service/event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package event

import (
amqp "github.com/rabbitmq/amqp091-go"
)

func declareExchange(ch *amqp.Channel) error {
return ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil)
}

func declareRandomQueue(ch *amqp.Channel) (amqp.Queue, error) {
return ch.QueueDeclare("", false, false, true, false, nil)
}
45 changes: 45 additions & 0 deletions listener-service/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,50 @@
package main

import (
"fmt"
"log"
"math"
"os"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
rabbitConn, err := connect()
if err != nil {
log.Println(err)
os.Exit(1)
}

defer rabbitConn.Close()
log.Println("Connected to RabbitMQ")
}

func connect() (*amqp.Connection, error) {
var counts int64
var backOff = 1 * time.Second
var connection *amqp.Connection

for {
conn, err := amqp.Dial("amqp://guest:guest@localhost")
if err != nil {
fmt.Println("RabbitMQ not yet ready...")
counts++
} else {
connection = conn
break
}

if counts > 5 {
fmt.Println(err)
return nil, err
}

backOff = time.Duration(math.Pow(float64(counts), 2)) * time.Second
log.Println("backing off...")
time.Sleep(backOff)
}

return connection, nil
}

0 comments on commit 33fa97d

Please sign in to comment.