diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index 13566b8..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Editor-based HTTP Client requests -/httpRequests/ -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml diff --git a/build/server_app_win.exe b/build/server_app_win.exe index 64f4525..56c396c 100644 Binary files a/build/server_app_win.exe and b/build/server_app_win.exe differ diff --git a/cmd/main.go b/cmd/main.go index 2b52d30..fdd6316 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -16,29 +16,59 @@ func main() { log.Fatalf("%s", err) } - //subscriber var wg sync.WaitGroup - //publish transaction - { - wg.Add(1) - go serv.PurchaseCreateSub().ListenProductPurchaseCreate(&wg) - } - //waiting reply - { - wg.Add(1) - go serv.PurchaseReplySub().ListenProductPurchaseReply(&wg) - wg.Add(1) - go serv.PurchaseReplySub().ListenPromotionPurchaseReply(&wg) + startSubscribers(serv, &wg) + startCronJobs(serv, &wg) + startAPIHandler(serv, &wg) - wg.Add(1) - go serv.PurchaseReplySub().ListenPaymentPurchaseReply(&wg) + wg.Wait() +} - wg.Add(1) - go serv.PurchaseReplySub().ListenDeliveryPurchaseReply(&wg) - } +func startSubscribers(serv *server.Server, wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + serv.PurchaseCreateSub().ListenProductPurchaseCreate(wg) + }() + + wg.Add(1) + go func() { + defer wg.Done() + serv.PurchaseReplySub().ListenProductPurchaseReply(wg) + }() + + wg.Add(1) + go func() { + defer wg.Done() + serv.PurchaseReplySub().ListenPromotionPurchaseReply(wg) + }() + + wg.Add(1) + go func() { + defer wg.Done() + serv.PurchaseReplySub().ListenPaymentPurchaseReply(wg) + }() + + wg.Add(1) + go func() { + defer wg.Done() + serv.PurchaseReplySub().ListenDeliveryPurchaseReply(wg) + }() +} - //api handler +func startCronJobs(serv *server.Server, wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + err := serv.CheckTxStatusCron().StartJob(wg) + if err != nil { + log.Error(err) + } + }() +} + +func startAPIHandler(serv *server.Server, wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() @@ -46,6 +76,4 @@ func main() { fmt.Printf("%s", err) } }() - - wg.Wait() } diff --git a/config/config.go b/config/config.go index 14ce1d3..85e937a 100644 --- a/config/config.go +++ b/config/config.go @@ -18,6 +18,7 @@ type Config struct { //Adapters Adapters AdapterService AdapterService RabbitMQ RabbitMQ + CronJob CronJob } type Server struct { @@ -38,6 +39,10 @@ type Server struct { ExpirationLimitTime time.Duration // expiration time of the limit } +type CronJob struct { + CheckingTxStatus string +} + type DB struct { Mongodb Mongodb } diff --git a/config/config.yml b/config/config.yml index f9e7369..d496808 100644 --- a/config/config.yml +++ b/config/config.yml @@ -33,7 +33,7 @@ RabbitMQ: SagaOrderEvent: Exchange: order_transaction_ex PublishRoutingKey: order.transaction.commit - SubscriberRoutingKey: order.transaction.reply + ReplyRoutingKey: order.transaction.reply SagaOrderProductEvent: Exchange: order_transaction_ex @@ -93,3 +93,6 @@ AdapterService: PromotionService: BaseURL: http://localhost:5010 InternalKey: + +CronJob: + CheckingTxStatus: "@every 5m" \ No newline at end of file diff --git a/go.mod b/go.mod index 69b5f0e..20b39d1 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect diff --git a/go.sum b/go.sum index 7111bd0..740cc2d 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,7 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= @@ -46,6 +47,9 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8= github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= @@ -69,10 +73,14 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zk github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= @@ -88,6 +96,8 @@ github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= @@ -209,6 +219,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/cronjob/check_tx_status.go b/internal/cronjob/check_tx_status.go new file mode 100644 index 0000000..b272897 --- /dev/null +++ b/internal/cronjob/check_tx_status.go @@ -0,0 +1,47 @@ +package cronjob + +import ( + "context" + "github.com/gofiber/fiber/v2/log" + "github.com/robfig/cron/v3" + "latipe-transaction-service/config" + "latipe-transaction-service/internal/service/orderserv" + "sync" + "time" +) + +type CheckingTxStatusCronJ struct { + cfg *config.Config + cron *cron.Cron + orderService orderserv.OrderService +} + +func NewCheckingTxStatusCronJ(cfg *config.Config, cron *cron.Cron, orderServ orderserv.OrderService) *CheckingTxStatusCronJ { + return &CheckingTxStatusCronJ{ + cfg: cfg, + cron: cron, + orderService: orderServ, + } +} + +func (cr *CheckingTxStatusCronJ) StartJob(wg *sync.WaitGroup) error { + ctxTimeout, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + log.Info("checking transaction status") + job, err := cr.cron.AddFunc(cr.cfg.CronJob.CheckingTxStatus, func() { + if err := cr.orderService.CheckTransactionStatus(ctxTimeout); err != nil { + log.Error(err) + } + }) + + if err != nil { + cr.cron.Remove(job) + return err + } + + cr.cron.Run() + + defer wg.Done() + return nil +} diff --git a/internal/cronjob/inject.go b/internal/cronjob/inject.go new file mode 100644 index 0000000..a54f395 --- /dev/null +++ b/internal/cronjob/inject.go @@ -0,0 +1,15 @@ +package cronjob + +import ( + "github.com/google/wire" + "github.com/robfig/cron/v3" +) + +var Set = wire.NewSet( + NewCronInstance, + NewCheckingTxStatusCronJ, +) + +func NewCronInstance() *cron.Cron { + return cron.New() +} diff --git a/internal/domain/repos/trans_repos_impl.go b/internal/domain/repos/trans_repos_impl.go index a1d24f6..90435f7 100644 --- a/internal/domain/repos/trans_repos_impl.go +++ b/internal/domain/repos/trans_repos_impl.go @@ -57,3 +57,22 @@ func (t *transactionRepository) UpdateTransaction(ctx context.Context, dao *enti return nil } + +func (t *transactionRepository) FindAllPendingTransaction(ctx context.Context) ([]*entities.TransactionLog, error) { + var txs []*entities.TransactionLog + filter := bson.M{ + "created_at": bson.M{"$gt": time.Now()}, + "status": entities.TX_PENDING, + } + + cursor, err := t.transCollection.Find(ctx, filter) + if err != nil { + return nil, err + } + + if err := cursor.All(ctx, &txs); err != nil { + return nil, err + } + + return txs, nil +} diff --git a/internal/domain/repos/transaction_repo.go b/internal/domain/repos/transaction_repo.go index b71960a..0c446f6 100644 --- a/internal/domain/repos/transaction_repo.go +++ b/internal/domain/repos/transaction_repo.go @@ -8,5 +8,6 @@ import ( type TransactionRepository interface { CreateTransactionData(ctx context.Context, dao *entities.TransactionLog) error FindByOrderID(ctx context.Context, orderID string) (*entities.TransactionLog, error) + FindAllPendingTransaction(ctx context.Context) ([]*entities.TransactionLog, error) UpdateTransaction(ctx context.Context, dao *entities.TransactionLog, commit *entities.Commits) error } diff --git a/internal/publisher/order_transaction_producer.go b/internal/publisher/order_transaction_producer.go index 558fe60..4bdd139 100644 --- a/internal/publisher/order_transaction_producer.go +++ b/internal/publisher/order_transaction_producer.go @@ -34,6 +34,33 @@ func NewOrderOrchestratorPub(cfg *config.Config) *OrderOrchestratorPub { return &producer } +func (pub *OrderOrchestratorPub) ReplyPurchaseMessage(message *message.CreateOrderReplyMessage) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + body, err := ParseOrderToByte(&message) + if err != nil { + return err + } + + log.Infof("Send message to queue %v - %v", + pub.cfg.RabbitMQ.SagaOrderEvent.Exchange, + pub.cfg.RabbitMQ.SagaOrderEvent.ReplyRoutingKey) + + err = pub.channel.PublishWithContext(ctx, + pub.cfg.RabbitMQ.SagaOrderEvent.Exchange, + pub.cfg.RabbitMQ.SagaOrderEvent.ReplyRoutingKey, + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: body, + }) + failOnError(err, "Failed to publish a message") + + return nil +} + func (pub *OrderOrchestratorPub) PublishPurchaseProductMessage(message *message.OrderProductMessage) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/internal/server.go b/internal/server.go index 77d0006..0eff6b7 100644 --- a/internal/server.go +++ b/internal/server.go @@ -10,6 +10,7 @@ import ( "github.com/gofiber/fiber/v2/middleware/logger" "github.com/google/wire" "latipe-transaction-service/config" + "latipe-transaction-service/internal/cronjob" "latipe-transaction-service/internal/domain/repos" "latipe-transaction-service/internal/publisher" "latipe-transaction-service/internal/service" @@ -22,6 +23,7 @@ type Server struct { globalCfg *config.Config purchaseReplySub *subscriber.PurchaseReplySubscriber purchaseCreateSub *subscriber.PurchaseCreateOrchestratorSubscriber + checkTxStatus *cronjob.CheckingTxStatusCronJ } func (serv Server) App() *fiber.App { @@ -40,6 +42,10 @@ func (serv Server) PurchaseCreateSub() *subscriber.PurchaseCreateOrchestratorSub return serv.purchaseCreateSub } +func (serv Server) CheckTxStatusCron() *cronjob.CheckingTxStatusCronJ { + return serv.checkTxStatus +} + func New() (*Server, error) { panic(wire.Build(wire.NewSet( NewServer, @@ -49,13 +55,15 @@ func New() (*Server, error) { service.Set, subscriber.Set, publisher.Set, + cronjob.Set, ))) } func NewServer( cfg *config.Config, purchaseReplySub *subscriber.PurchaseReplySubscriber, - purchaseCreateSub *subscriber.PurchaseCreateOrchestratorSubscriber) *Server { + purchaseCreateSub *subscriber.PurchaseCreateOrchestratorSubscriber, + checkTxStatus *cronjob.CheckingTxStatusCronJ) *Server { app := fiber.New(fiber.Config{ ReadTimeout: cfg.Server.ReadTimeout, @@ -87,5 +95,6 @@ func NewServer( app: app, purchaseReplySub: purchaseReplySub, purchaseCreateSub: purchaseCreateSub, + checkTxStatus: checkTxStatus, } } diff --git a/internal/service/orderserv/order_serv.go b/internal/service/orderserv/order_serv.go index 3941258..5b45e7e 100644 --- a/internal/service/orderserv/order_serv.go +++ b/internal/service/orderserv/order_serv.go @@ -10,4 +10,5 @@ type OrderService interface { StartPurchaseTransaction(ctx context.Context, message *message.OrderPendingMessage) error HandleTransactionPurchaseReply(ctx context.Context, message *message.CreateOrderReplyMessage, serviceType int) error RollbackTransactionPub(dao *entities.TransactionLog) error + CheckTransactionStatus(ctx context.Context) error } diff --git a/internal/service/orderserv/orderserv_impl.go b/internal/service/orderserv/orderserv_impl.go index 160fe48..886c597 100644 --- a/internal/service/orderserv/orderserv_impl.go +++ b/internal/service/orderserv/orderserv_impl.go @@ -185,3 +185,26 @@ func (o orderService) rollbackPurchaseToService(dao *entities.Commits, orderId s return nil } + +func (o orderService) CheckTransactionStatus(ctx context.Context) error { + txs, err := o.transactionRepo.FindAllPendingTransaction(ctx) + if err != nil { + return err + } + + for _, i := range txs { + if IsCommitSuccess(i.Commits) { + replyMessage := message.CreateOrderReplyMessage{ + Status: entities.TX_SUCCESS, + OrderID: i.OrderID, + } + + err := o.transactionOrchestrator.ReplyPurchaseMessage(&replyMessage) + if err != nil { + log.Errorf("publish tx order [%v] message was failed %v", i.OrderID, err) + } + } + } + + return err +} diff --git a/internal/service/orderserv/utils.go b/internal/service/orderserv/utils.go index 93d88fd..ef873ae 100644 --- a/internal/service/orderserv/utils.go +++ b/internal/service/orderserv/utils.go @@ -24,3 +24,12 @@ func MappingServiceName(serviceType int) string { } return "" } + +func IsCommitSuccess(commits []entities.Commits) bool { + for _, i := range commits { + if i.TxStatus != entities.TX_SUCCESS { + return false + } + } + return true +} diff --git a/internal/wire_gen.go b/internal/wire_gen.go index a632254..2978e05 100644 --- a/internal/wire_gen.go +++ b/internal/wire_gen.go @@ -12,6 +12,7 @@ import ( "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/logger" "latipe-transaction-service/config" + "latipe-transaction-service/internal/cronjob" "latipe-transaction-service/internal/domain/repos" "latipe-transaction-service/internal/publisher" "latipe-transaction-service/internal/service/orderserv" @@ -35,7 +36,9 @@ func New() (*Server, error) { orderService := orderserv.NewOrderService(transactionRepository, orderOrchestratorPub) purchaseReplySubscriber := subscriber.NewPurchaseSubscriberReply(configConfig, orderService) purchaseCreateOrchestratorSubscriber := subscriber.NewPurchaseCreateOrchestratorSubscriber(configConfig, orderService) - server := NewServer(configConfig, purchaseReplySubscriber, purchaseCreateOrchestratorSubscriber) + cron := cronjob.NewCronInstance() + checkingTxStatusCronJ := cronjob.NewCheckingTxStatusCronJ(configConfig, cron, orderService) + server := NewServer(configConfig, purchaseReplySubscriber, purchaseCreateOrchestratorSubscriber, checkingTxStatusCronJ) return server, nil } @@ -46,6 +49,7 @@ type Server struct { globalCfg *config.Config purchaseReplySub *subscriber.PurchaseReplySubscriber purchaseCreateSub *subscriber.PurchaseCreateOrchestratorSubscriber + checkTxStatus *cronjob.CheckingTxStatusCronJ } func (serv Server) App() *fiber.App { @@ -64,10 +68,15 @@ func (serv Server) PurchaseCreateSub() *subscriber.PurchaseCreateOrchestratorSub return serv.purchaseCreateSub } +func (serv Server) CheckTxStatusCron() *cronjob.CheckingTxStatusCronJ { + return serv.checkTxStatus +} + func NewServer( cfg *config.Config, purchaseReplySub *subscriber.PurchaseReplySubscriber, - purchaseCreateSub *subscriber.PurchaseCreateOrchestratorSubscriber) *Server { + purchaseCreateSub *subscriber.PurchaseCreateOrchestratorSubscriber, + checkTxStatus *cronjob.CheckingTxStatusCronJ) *Server { app := fiber.New(fiber.Config{ ReadTimeout: cfg.Server.ReadTimeout, @@ -98,5 +107,6 @@ func NewServer( app: app, purchaseReplySub: purchaseReplySub, purchaseCreateSub: purchaseCreateSub, + checkTxStatus: checkTxStatus, } }