Skip to content

Commit

Permalink
#update: handle rollback transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
tdatIT committed Feb 7, 2024
1 parent 0a5074d commit 12698d3
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 18 deletions.
6 changes: 6 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func startSubscribers(serv *server.Server, wg *sync.WaitGroup) {
serv.PurchaseCreateSub().ListenPurchaseCreate(wg)
}()

wg.Add(1)
go func() {
defer wg.Done()
serv.PurchaseCancelSub().ListenPurchaseCancel(wg)
}()

wg.Add(1)
go func() {
defer wg.Done()
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type SagaOrderEvent struct {
Connection string
Exchange string
PublishRoutingKey string
CancelRoutingKey string
ReplyRoutingKey string
Queue string
}
Expand Down
3 changes: 2 additions & 1 deletion config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ RabbitMQ:
Exchange: order_transaction_ex
PublishRoutingKey: order.transaction.commit
ReplyRoutingKey: order.transaction.reply
CancelRoutingKey: order.transaction.cancel

SagaOrderProductEvent:
Exchange: order_transaction_ex
Expand All @@ -54,7 +55,7 @@ RabbitMQ:

SagaOrderPaymentEvent:
Exchange: order_transaction_ex
CommitRoutingKey: order.payment.create
CommitRoutingKey: order.payment.event
RollbackRoutingKey: order.payment.rollback
ReplyRoutingKey: order.payment.reply
Queue:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ require (
github.com/google/wire v0.5.0
github.com/jinzhu/copier v0.4.0
github.com/rabbitmq/amqp091-go v1.9.0
github.com/redis/go-redis/v9 v9.4.0
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/viper v1.18.2
go.mongodb.org/mongo-driver v1.13.1

Expand Down Expand Up @@ -43,9 +45,7 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/redis/go-redis/v9 v9.4.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
Expand Down
11 changes: 2 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+Mn
github.com/ansrivas/fiberprometheus/v2 v2.6.1/go.mod h1:MloIKvy4yN6hVqlRpJ/jDiR244YnWJaQC0FIqS8A+MY=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -16,7 +18,6 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
Expand Down Expand Up @@ -49,9 +50,6 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
Expand All @@ -75,14 +73,10 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zk
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8=
Expand Down Expand Up @@ -223,7 +217,6 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
6 changes: 6 additions & 0 deletions internal/domain/message/cancel_order.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package message

type CancelOrderMessage struct {
OrderID string `json:"order_id"`
CancelStatus int `json:"cancel_status"`
}
15 changes: 11 additions & 4 deletions internal/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ package server

import (
"encoding/json"
"github.com/ansrivas/fiberprometheus/v2"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/logger"
"github.com/google/wire"
"latipe-transaction-service/config"
"latipe-transaction-service/internal/adapter"
"latipe-transaction-service/internal/api/handler"
Expand All @@ -19,17 +15,22 @@ import (
"latipe-transaction-service/internal/publisher"
"latipe-transaction-service/internal/service"
"latipe-transaction-service/internal/subscriber"
"latipe-transaction-service/internal/subscriber/cancelPurchase"
"latipe-transaction-service/internal/subscriber/createPurchase"
"latipe-transaction-service/pkgs/cache"
"latipe-transaction-service/pkgs/db/mongodb"
"latipe-transaction-service/pkgs/rabbitclient"

"github.com/gofiber/fiber/v2/middleware/logger"
"github.com/google/wire"
)

type Server struct {
app *fiber.App
globalCfg *config.Config
purchaseReplySub *createPurchase.PurchaseReplySubscriber
purchaseCreateSub *createPurchase.PurchaseCreateOrchestratorSubscriber
purchaseCancelSub *cancelPurchase.PurchaseCancelOrchestratorSubscriber
checkTxStatus *cronjob.CheckingTxStatusCronJ
}

Expand All @@ -49,6 +50,10 @@ func (serv Server) PurchaseCreateSub() *createPurchase.PurchaseCreateOrchestrato
return serv.purchaseCreateSub
}

func (serv Server) PurchaseCancelSub() *cancelPurchase.PurchaseCancelOrchestratorSubscriber {
return serv.purchaseCancelSub
}

func (serv Server) CheckTxStatusCron() *cronjob.CheckingTxStatusCronJ {
return serv.checkTxStatus
}
Expand Down Expand Up @@ -77,6 +82,7 @@ func NewServer(
transRouter router.TransactionRouter,
purchaseReplySub *createPurchase.PurchaseReplySubscriber,
purchaseCreateSub *createPurchase.PurchaseCreateOrchestratorSubscriber,
purchaseCancelSub *cancelPurchase.PurchaseCancelOrchestratorSubscriber,
checkTxStatus *cronjob.CheckingTxStatusCronJ) *Server {

app := fiber.New(fiber.Config{
Expand Down Expand Up @@ -112,6 +118,7 @@ func NewServer(
app: app,
purchaseReplySub: purchaseReplySub,
purchaseCreateSub: purchaseCreateSub,
purchaseCancelSub: purchaseCancelSub,
checkTxStatus: checkTxStatus,
}
}
1 change: 1 addition & 0 deletions internal/service/orderserv/order_serv.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

type OrderService interface {
StartPurchaseTransaction(ctx context.Context, message *message.OrderPendingMessage) error
CancelOrder(ctx context.Context, message *message.CancelOrderMessage) error
HandleTransactionPurchaseReply(ctx context.Context, message *message.CreateOrderReplyMessage, serviceType int) error
RollbackTransactionPub(dao *entities.TransactionLog) error
CheckTransactionStatus(ctx context.Context) error
Expand Down
53 changes: 52 additions & 1 deletion internal/service/orderserv/orderserv_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ func (o orderService) HandleTransactionPurchaseReply(ctx context.Context, msg *m

// Rollback the transaction where the service has sent message has a status of success
// and transaction status is failed

if msg.Status == message.COMMIT_SUCCESS {

switch trans.TransactionStatus {
Expand Down Expand Up @@ -362,3 +361,55 @@ func (o orderService) CheckTransactionStatus(ctx context.Context) error {

return err
}

func (o orderService) CancelOrder(ctx context.Context, req *message.CancelOrderMessage) error {
msg := message.RollbackPurchaseMessage{
Status: req.CancelStatus,
OrderID: req.OrderID,
}

// Create channels for error handling
errCh := make(chan error, 4) // number of messages to send

// create a wait group to wait for all goroutines to finish
var wg sync.WaitGroup
wg.Add(4) // number of goroutines is equal to the number of message types

// define a function to send messages and handle errors
handlerMessageGoroutine := func(fn func() error) {
defer wg.Done() // decrement the wait group counter when the goroutine finishes
if err := fn(); err != nil {
errCh <- err
}
}

// start goroutines for each message type and set cache message
go handlerMessageGoroutine(func() error {
return o.transactionOrchestrator.RollbackPurchaseDeliveryMessage(&msg)
})

go handlerMessageGoroutine(func() error {
return o.transactionOrchestrator.RollbackPurchaseProductMessage(&msg)
})

go handlerMessageGoroutine(func() error {
return o.transactionOrchestrator.RollbackPurchasePromotionMessage(&msg)
})

go handlerMessageGoroutine(func() error {
return o.transactionOrchestrator.RollbackPurchasePaymentMessage(&msg)
})

// wait for all goroutines to finish
wg.Wait()

// close the error channel after all goroutines have finished
close(errCh)

// handle any remaining errors in the error channel
for err := range errCh {
log.Errorf("Publish message failed: %v", err)
}

return nil
}
121 changes: 121 additions & 0 deletions internal/subscriber/cancelPurchase/purchase_cancel_orchestrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package cancelPurchase

import (
"context"
"encoding/json"
"fmt"
"github.com/gofiber/fiber/v2/log"
amqp "github.com/rabbitmq/amqp091-go"
"latipe-transaction-service/config"
"latipe-transaction-service/internal/domain/message"
"latipe-transaction-service/internal/service/orderserv"
"sync"
"time"
)

type PurchaseCancelOrchestratorSubscriber struct {
config *config.Config
orderServ orderserv.OrderService
conn *amqp.Connection
}

func NewPurchaseCancelOrchestratorSubscriber(cfg *config.Config,
orderServ orderserv.OrderService, conn *amqp.Connection) *PurchaseCancelOrchestratorSubscriber {
return &PurchaseCancelOrchestratorSubscriber{
config: cfg,
orderServ: orderServ,
conn: conn,
}
}

func (orch PurchaseCancelOrchestratorSubscriber) ListenPurchaseCancel(wg *sync.WaitGroup) {
channel, err := orch.conn.Channel()
defer channel.Close()

// define an exchange type "topic"
err = channel.ExchangeDeclare(
orch.config.RabbitMQ.SagaOrderEvent.Exchange,
"topic",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("cannot declare exchange: %v", err)
}

// create queue
q, err := channel.QueueDeclare(
"purchase_cancel_event",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("cannot declare queue: %v", err)
}

err = channel.QueueBind(
q.Name,
orch.config.RabbitMQ.SagaOrderEvent.CancelRoutingKey,
orch.config.RabbitMQ.SagaOrderEvent.Exchange,
false,
nil)
if err != nil {
log.Fatalf("cannot bind exchange: %v", err)
}

// declaring consumer with its properties over channel opened
msgs, err := channel.Consume(
q.Name, // queue
orch.config.RabbitMQ.ServiceName, // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, //args
)
if err != nil {
panic(err)
}

defer wg.Done()
// handle consumed messages from queue
for msg := range msgs {
log.Infof("received order message from: %s", msg.RoutingKey)
if err := orch.handleMessage(&msg); err != nil {
log.Infof("The order cancel failed cause %s", err)
}
}

log.Infof("message queue has started")
log.Infof("waiting for messages...")
}

func (orch PurchaseCancelOrchestratorSubscriber) handleMessage(msg *amqp.Delivery) error {
startTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

messageDTO := message.CancelOrderMessage{}

if err := json.Unmarshal(msg.Body, &messageDTO); err != nil {
log.Infof("Parse message to order failed cause: %s", err)
return err
}

err := orch.orderServ.CancelOrder(ctx, &messageDTO)
if err != nil {
log.Infof("Handling reply message was failed cause: %s", err)
return err
}

endTime := time.Now()
log.Infof("The orders [checkout_id: %v] was processed successfully - duration:%v", messageDTO.OrderID, endTime.Sub(startTime))
fmt.Println()
return nil
}
20 changes: 20 additions & 0 deletions internal/subscriber/cancelPurchase/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cancelPurchase

import (
"latipe-transaction-service/config"
"latipe-transaction-service/internal/service/orderserv"
)

func MappingRoutingKeyToService(routingKey string, config *config.Config) int {
switch routingKey {
case config.RabbitMQ.SagaOrderProductEvent.ReplyRoutingKey:
return orderserv.PRODUCT_SERVICE
case config.RabbitMQ.SagaOrderDeliveryEvent.ReplyRoutingKey:
return orderserv.DELIVERY_SERVICE
case config.RabbitMQ.SagaOrderPromotionEvent.ReplyRoutingKey:
return orderserv.PROMOTION_SERVICE
case config.RabbitMQ.SagaOrderPaymentEvent.ReplyRoutingKey:
return orderserv.PAYMENT_SERVICE
}
return -1
}
Loading

0 comments on commit 12698d3

Please sign in to comment.