Skip to content

Commit

Permalink
#add: implement reply message
Browse files Browse the repository at this point in the history
  • Loading branch information
tdatIT committed Jan 26, 2024
1 parent 1438d1d commit b82b57c
Show file tree
Hide file tree
Showing 17 changed files with 233 additions and 32 deletions.
8 changes: 0 additions & 8 deletions .idea/.gitignore

This file was deleted.

Binary file modified build/server_app_win.exe
Binary file not shown.
68 changes: 48 additions & 20 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,64 @@ func main() {
log.Fatalf("%s", err)
}

//subscriber
var wg sync.WaitGroup
//publish transaction
{
wg.Add(1)
go serv.PurchaseCreateSub().ListenProductPurchaseCreate(&wg)
}
//waiting reply
{
wg.Add(1)
go serv.PurchaseReplySub().ListenProductPurchaseReply(&wg)

wg.Add(1)
go serv.PurchaseReplySub().ListenPromotionPurchaseReply(&wg)
startSubscribers(serv, &wg)
startCronJobs(serv, &wg)
startAPIHandler(serv, &wg)

wg.Add(1)
go serv.PurchaseReplySub().ListenPaymentPurchaseReply(&wg)
wg.Wait()
}

wg.Add(1)
go serv.PurchaseReplySub().ListenDeliveryPurchaseReply(&wg)
}
func startSubscribers(serv *server.Server, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
serv.PurchaseCreateSub().ListenProductPurchaseCreate(wg)
}()

wg.Add(1)
go func() {
defer wg.Done()
serv.PurchaseReplySub().ListenProductPurchaseReply(wg)
}()

wg.Add(1)
go func() {
defer wg.Done()
serv.PurchaseReplySub().ListenPromotionPurchaseReply(wg)
}()

wg.Add(1)
go func() {
defer wg.Done()
serv.PurchaseReplySub().ListenPaymentPurchaseReply(wg)
}()

wg.Add(1)
go func() {
defer wg.Done()
serv.PurchaseReplySub().ListenDeliveryPurchaseReply(wg)
}()
}

//api handler
func startCronJobs(serv *server.Server, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
err := serv.CheckTxStatusCron().StartJob(wg)
if err != nil {
log.Error(err)
}
}()
}

func startAPIHandler(serv *server.Server, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
if err := serv.App().Listen(serv.Config().Server.Port); err != nil {
fmt.Printf("%s", err)
}
}()

wg.Wait()
}
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Config struct {
//Adapters Adapters
AdapterService AdapterService
RabbitMQ RabbitMQ
CronJob CronJob
}

type Server struct {
Expand All @@ -38,6 +39,10 @@ type Server struct {
ExpirationLimitTime time.Duration // expiration time of the limit
}

type CronJob struct {
CheckingTxStatus string
}

type DB struct {
Mongodb Mongodb
}
Expand Down
5 changes: 4 additions & 1 deletion config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ RabbitMQ:
SagaOrderEvent:
Exchange: order_transaction_ex
PublishRoutingKey: order.transaction.commit
SubscriberRoutingKey: order.transaction.reply
ReplyRoutingKey: order.transaction.reply

SagaOrderProductEvent:
Exchange: order_transaction_ex
Expand Down Expand Up @@ -93,3 +93,6 @@ AdapterService:
PromotionService:
BaseURL: http://localhost:5010
InternalKey:

CronJob:
CheckingTxStatus: "@every 5m"
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
Expand Down Expand Up @@ -46,6 +47,9 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
Expand All @@ -69,10 +73,14 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zk
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8=
Expand All @@ -88,6 +96,8 @@ github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ=
github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
Expand Down Expand Up @@ -209,6 +219,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
47 changes: 47 additions & 0 deletions internal/cronjob/check_tx_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cronjob

import (
"context"
"github.com/gofiber/fiber/v2/log"
"github.com/robfig/cron/v3"
"latipe-transaction-service/config"
"latipe-transaction-service/internal/service/orderserv"
"sync"
"time"
)

type CheckingTxStatusCronJ struct {
cfg *config.Config
cron *cron.Cron
orderService orderserv.OrderService
}

func NewCheckingTxStatusCronJ(cfg *config.Config, cron *cron.Cron, orderServ orderserv.OrderService) *CheckingTxStatusCronJ {
return &CheckingTxStatusCronJ{
cfg: cfg,
cron: cron,
orderService: orderServ,
}
}

func (cr *CheckingTxStatusCronJ) StartJob(wg *sync.WaitGroup) error {
ctxTimeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

log.Info("checking transaction status")
job, err := cr.cron.AddFunc(cr.cfg.CronJob.CheckingTxStatus, func() {
if err := cr.orderService.CheckTransactionStatus(ctxTimeout); err != nil {
log.Error(err)
}
})

if err != nil {
cr.cron.Remove(job)
return err
}

cr.cron.Run()

defer wg.Done()
return nil
}
15 changes: 15 additions & 0 deletions internal/cronjob/inject.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package cronjob

import (
"github.com/google/wire"
"github.com/robfig/cron/v3"
)

var Set = wire.NewSet(
NewCronInstance,
NewCheckingTxStatusCronJ,
)

func NewCronInstance() *cron.Cron {
return cron.New()
}
19 changes: 19 additions & 0 deletions internal/domain/repos/trans_repos_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,22 @@ func (t *transactionRepository) UpdateTransaction(ctx context.Context, dao *enti

return nil
}

func (t *transactionRepository) FindAllPendingTransaction(ctx context.Context) ([]*entities.TransactionLog, error) {
var txs []*entities.TransactionLog
filter := bson.M{
"created_at": bson.M{"$gt": time.Now()},
"status": entities.TX_PENDING,
}

cursor, err := t.transCollection.Find(ctx, filter)
if err != nil {
return nil, err
}

if err := cursor.All(ctx, &txs); err != nil {
return nil, err
}

return txs, nil
}
1 change: 1 addition & 0 deletions internal/domain/repos/transaction_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ import (
type TransactionRepository interface {
CreateTransactionData(ctx context.Context, dao *entities.TransactionLog) error
FindByOrderID(ctx context.Context, orderID string) (*entities.TransactionLog, error)
FindAllPendingTransaction(ctx context.Context) ([]*entities.TransactionLog, error)
UpdateTransaction(ctx context.Context, dao *entities.TransactionLog, commit *entities.Commits) error
}
27 changes: 27 additions & 0 deletions internal/publisher/order_transaction_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,33 @@ func NewOrderOrchestratorPub(cfg *config.Config) *OrderOrchestratorPub {
return &producer
}

func (pub *OrderOrchestratorPub) ReplyPurchaseMessage(message *message.CreateOrderReplyMessage) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

body, err := ParseOrderToByte(&message)
if err != nil {
return err
}

log.Infof("Send message to queue %v - %v",
pub.cfg.RabbitMQ.SagaOrderEvent.Exchange,
pub.cfg.RabbitMQ.SagaOrderEvent.ReplyRoutingKey)

err = pub.channel.PublishWithContext(ctx,
pub.cfg.RabbitMQ.SagaOrderEvent.Exchange,
pub.cfg.RabbitMQ.SagaOrderEvent.ReplyRoutingKey,
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
})
failOnError(err, "Failed to publish a message")

return nil
}

func (pub *OrderOrchestratorPub) PublishPurchaseProductMessage(message *message.OrderProductMessage) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
11 changes: 10 additions & 1 deletion internal/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/gofiber/fiber/v2/middleware/logger"
"github.com/google/wire"
"latipe-transaction-service/config"
"latipe-transaction-service/internal/cronjob"
"latipe-transaction-service/internal/domain/repos"
"latipe-transaction-service/internal/publisher"
"latipe-transaction-service/internal/service"
Expand All @@ -22,6 +23,7 @@ type Server struct {
globalCfg *config.Config
purchaseReplySub *subscriber.PurchaseReplySubscriber
purchaseCreateSub *subscriber.PurchaseCreateOrchestratorSubscriber
checkTxStatus *cronjob.CheckingTxStatusCronJ
}

func (serv Server) App() *fiber.App {
Expand All @@ -40,6 +42,10 @@ func (serv Server) PurchaseCreateSub() *subscriber.PurchaseCreateOrchestratorSub
return serv.purchaseCreateSub
}

func (serv Server) CheckTxStatusCron() *cronjob.CheckingTxStatusCronJ {
return serv.checkTxStatus
}

func New() (*Server, error) {
panic(wire.Build(wire.NewSet(
NewServer,
Expand All @@ -49,13 +55,15 @@ func New() (*Server, error) {
service.Set,
subscriber.Set,
publisher.Set,
cronjob.Set,
)))
}

func NewServer(
cfg *config.Config,
purchaseReplySub *subscriber.PurchaseReplySubscriber,
purchaseCreateSub *subscriber.PurchaseCreateOrchestratorSubscriber) *Server {
purchaseCreateSub *subscriber.PurchaseCreateOrchestratorSubscriber,
checkTxStatus *cronjob.CheckingTxStatusCronJ) *Server {

app := fiber.New(fiber.Config{
ReadTimeout: cfg.Server.ReadTimeout,
Expand Down Expand Up @@ -87,5 +95,6 @@ func NewServer(
app: app,
purchaseReplySub: purchaseReplySub,
purchaseCreateSub: purchaseCreateSub,
checkTxStatus: checkTxStatus,
}
}
1 change: 1 addition & 0 deletions internal/service/orderserv/order_serv.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ type OrderService interface {
StartPurchaseTransaction(ctx context.Context, message *message.OrderPendingMessage) error
HandleTransactionPurchaseReply(ctx context.Context, message *message.CreateOrderReplyMessage, serviceType int) error
RollbackTransactionPub(dao *entities.TransactionLog) error
CheckTransactionStatus(ctx context.Context) error
}
23 changes: 23 additions & 0 deletions internal/service/orderserv/orderserv_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,26 @@ func (o orderService) rollbackPurchaseToService(dao *entities.Commits, orderId s

return nil
}

func (o orderService) CheckTransactionStatus(ctx context.Context) error {
txs, err := o.transactionRepo.FindAllPendingTransaction(ctx)
if err != nil {
return err
}

for _, i := range txs {
if IsCommitSuccess(i.Commits) {
replyMessage := message.CreateOrderReplyMessage{
Status: entities.TX_SUCCESS,
OrderID: i.OrderID,
}

err := o.transactionOrchestrator.ReplyPurchaseMessage(&replyMessage)
if err != nil {
log.Errorf("publish tx order [%v] message was failed %v", i.OrderID, err)
}
}
}

return err
}
Loading

0 comments on commit b82b57c

Please sign in to comment.