diff --git a/shortener/cmd/v1/main.go b/shortener/cmd/v1/main.go index 29f7027..365db9e 100644 --- a/shortener/cmd/v1/main.go +++ b/shortener/cmd/v1/main.go @@ -138,6 +138,15 @@ func main() { app.Logger.Error("Error received by channel", err) } case consumerMode: - infrastructure.ConsumeMessages(app, app.Config.RabbitMQ.QueueCreateShortener) + // Make a channel to receive messages into infinite loop. + forever := make(chan bool) + + queues := []string{app.Config.RabbitMQ.QueueCreateShortener, app.Config.RabbitMQ.QueueUpdateVisitor} + + for _, q := range queues { + go infrastructure.ConsumeMessages(app, q) + } + + <-forever } } diff --git a/shortener/internal/v1/infrastructure/rabbitmq.go b/shortener/internal/v1/infrastructure/rabbitmq.go index a2d35b9..f791d9a 100644 --- a/shortener/internal/v1/infrastructure/rabbitmq.go +++ b/shortener/internal/v1/infrastructure/rabbitmq.go @@ -2,11 +2,13 @@ package infrastructure import ( "encoding/json" + "fmt" "github.com/PickHD/singkatin-revamp/shortener/internal/v1/application" "github.com/PickHD/singkatin-revamp/shortener/internal/v1/model" ) +// ConsumeMessages generic function to consume message from defined param queues func ConsumeMessages(app *application.App, queueName string) { dep := application.SetupDependencyInjection(app) @@ -26,9 +28,6 @@ func ConsumeMessages(app *application.App, queueName string) { app.Logger.Info("Waiting Message in Queues ", queueName, ".....") - // Make a channel to receive messages into infinite loop. - forever := make(chan bool) - go func() { for msg := range messages { switch queueName { @@ -40,18 +39,32 @@ func ConsumeMessages(app *application.App, queueName string) { app.Logger.Error("Unmarshal JSON ERROR, ", err) } - app.Logger.Info("Success Consume Message :", req) + app.Logger.Info(fmt.Sprintf("[%s] Success Consume Message :", queueName), req) err = dep.ShortController.ProcessCreateShortUser(app.Context, &req) if err != nil { app.Logger.Error("ProcessCreateShortUser ERROR, ", err) } - app.Logger.Info("Success Process Message : ", req) + app.Logger.Info(fmt.Sprintf("[%s] Success Process Message :", queueName), req) case app.Config.RabbitMQ.QueueUpdateVisitor: + var req model.UpdateVisitorRequest + + err := json.Unmarshal(msg.Body, &req) + if err != nil { + app.Logger.Error("Unmarshal JSON ERROR, ", err) + } + + app.Logger.Info(fmt.Sprintf("[%s] Success Consume Message :", queueName), req) + + err = dep.ShortController.ProcessUpdateVisitorCount(app.Context, &req) + if err != nil { + app.Logger.Error("ProcessUpdateVisitorCount ERROR, ", err) + } + + app.Logger.Info(fmt.Sprintf("[%s] Success Process Message :", queueName), req) + } } }() - - <-forever }