Skip to content

Commit

Permalink
feat (SRS-06) : improve generic consumer queues
Browse files Browse the repository at this point in the history
  • Loading branch information
PickHD committed May 8, 2023
1 parent 6fd24f4 commit 63ebd14
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 8 deletions.
11 changes: 10 additions & 1 deletion shortener/cmd/v1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ func main() {
app.Logger.Error("Error received by channel", err)
}
case consumerMode:
infrastructure.ConsumeMessages(app, app.Config.RabbitMQ.QueueCreateShortener)
// Make a channel to receive messages into infinite loop.
forever := make(chan bool)

queues := []string{app.Config.RabbitMQ.QueueCreateShortener, app.Config.RabbitMQ.QueueUpdateVisitor}

for _, q := range queues {
go infrastructure.ConsumeMessages(app, q)
}

<-forever
}
}
27 changes: 20 additions & 7 deletions shortener/internal/v1/infrastructure/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package infrastructure

import (
"encoding/json"
"fmt"

"github.com/PickHD/singkatin-revamp/shortener/internal/v1/application"
"github.com/PickHD/singkatin-revamp/shortener/internal/v1/model"
)

// ConsumeMessages generic function to consume message from defined param queues
func ConsumeMessages(app *application.App, queueName string) {
dep := application.SetupDependencyInjection(app)

Expand All @@ -26,9 +28,6 @@ func ConsumeMessages(app *application.App, queueName string) {

app.Logger.Info("Waiting Message in Queues ", queueName, ".....")

// Make a channel to receive messages into infinite loop.
forever := make(chan bool)

go func() {
for msg := range messages {
switch queueName {
Expand All @@ -40,18 +39,32 @@ func ConsumeMessages(app *application.App, queueName string) {
app.Logger.Error("Unmarshal JSON ERROR, ", err)
}

app.Logger.Info("Success Consume Message :", req)
app.Logger.Info(fmt.Sprintf("[%s] Success Consume Message :", queueName), req)

err = dep.ShortController.ProcessCreateShortUser(app.Context, &req)
if err != nil {
app.Logger.Error("ProcessCreateShortUser ERROR, ", err)
}

app.Logger.Info("Success Process Message : ", req)
app.Logger.Info(fmt.Sprintf("[%s] Success Process Message :", queueName), req)
case app.Config.RabbitMQ.QueueUpdateVisitor:
var req model.UpdateVisitorRequest

err := json.Unmarshal(msg.Body, &req)
if err != nil {
app.Logger.Error("Unmarshal JSON ERROR, ", err)
}

app.Logger.Info(fmt.Sprintf("[%s] Success Consume Message :", queueName), req)

err = dep.ShortController.ProcessUpdateVisitorCount(app.Context, &req)
if err != nil {
app.Logger.Error("ProcessUpdateVisitorCount ERROR, ", err)
}

app.Logger.Info(fmt.Sprintf("[%s] Success Process Message :", queueName), req)

}
}
}()

<-forever
}

0 comments on commit 63ebd14

Please sign in to comment.