From 03598d944b08aaef43ae96c0e43eebbeeb264372 Mon Sep 17 00:00:00 2001 From: Viktor Radchenko <1641795+vikmeup@users.noreply.github.com> Date: Tue, 5 Jan 2021 00:04:43 -0800 Subject: [PATCH] Shared mq package functions (#22) * Simplify mq * Add QuitWorker * Update mq.go * Update mq.go * Update mq.go * Update mq.go * Update mq.go * Update mq.go * Add Bind queue * Update mq.go * Add message reject * Update mq.go * Remove Nack * Add ConsumerOptions * Update mq.go * Update mq.go * Update mq.go * Update mq.go * Update mq.go * Fix callback * Update mq.go * Add SetupGracefulShutdown * Add SetupGracefulShutdownForTimeout * Rename to SetupGracefulShutdown * Add StacktraceConfiguration --- network/gin/setup.go | 43 +++++++++++ network/middleware/sentry.go | 3 + network/middleware/shutdown.go | 19 +++++ network/mq/mq.go | 136 ++++++++++++++++++++++++--------- 4 files changed, 164 insertions(+), 37 deletions(-) create mode 100644 network/gin/setup.go create mode 100644 network/middleware/shutdown.go diff --git a/network/gin/setup.go b/network/gin/setup.go new file mode 100644 index 0000000..6e6e376 --- /dev/null +++ b/network/gin/setup.go @@ -0,0 +1,43 @@ +package gin + +import ( + "context" + "net/http" + "os" + "os/signal" + "syscall" + + "github.com/gin-gonic/gin" + log "github.com/sirupsen/logrus" +) + +func SetupGracefulShutdown(ctx context.Context, port string, engine *gin.Engine) { + server := &http.Server{ + Addr: ":" + port, + Handler: engine, + } + + defer func() { + if err := server.Shutdown(ctx); err != nil { + log.Info("Server Shutdown: ", err) + } + }() + + signalForExit := make(chan os.Signal, 1) + signal.Notify(signalForExit, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + go func() { + if err := server.ListenAndServe(); err != nil { + log.Fatal("Application failed", err) + } + }() + log.WithFields(log.Fields{"bind": port}).Info("Running application") + + stop := <-signalForExit + log.Info("Stop signal Received", stop) + log.Info("Waiting for all jobs to stop") +} diff --git a/network/middleware/sentry.go b/network/middleware/sentry.go index b2c85d5..682e310 100644 --- a/network/middleware/sentry.go +++ b/network/middleware/sentry.go @@ -15,6 +15,9 @@ func SetupSentry(dsn string) error { return err } hook.StacktraceConfiguration.Enable = true + hook.StacktraceConfiguration.IncludeErrorBreadcrumb = true + hook.StacktraceConfiguration.Context = 10 + hook.StacktraceConfiguration.SendExceptionType = true log.AddHook(hook) return nil } diff --git a/network/middleware/shutdown.go b/network/middleware/shutdown.go new file mode 100644 index 0000000..f5369a9 --- /dev/null +++ b/network/middleware/shutdown.go @@ -0,0 +1,19 @@ +package middleware + +import ( + "os" + "os/signal" + "syscall" + "time" + + log "github.com/sirupsen/logrus" +) + +func SetupGracefulShutdown(timeout time.Duration) { + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + log.Info("Shutdown timeout: ...", timeout) + time.Sleep(timeout) + log.Info("Exiting gracefully") +} diff --git a/network/mq/mq.go b/network/mq/mq.go index 4a4cb40..a6de587 100755 --- a/network/mq/mq.go +++ b/network/mq/mq.go @@ -2,9 +2,12 @@ package mq import ( "context" - log "github.com/sirupsen/logrus" + "os" + "syscall" "time" + log "github.com/sirupsen/logrus" + "github.com/streadway/amqp" ) @@ -12,24 +15,49 @@ var ( prefetchCount int amqpChan *amqp.Channel conn *amqp.Connection + + DefaultConsumerOptions = ConsumerOptions{ + Workers: 1, + RetryOnError: false, + RetryDelay: 0, + } ) +const () + +type Consumer interface { + Callback(msg amqp.Delivery) error +} + +type ConsumerOptions struct { + Workers int + RetryOnError bool + RetryDelay time.Duration +} + type ( - Queue string - Consumer func(amqp.Delivery) - MessageChannel <-chan amqp.Delivery + Queue string + Exchange string + MessageChannel <-chan amqp.Delivery ) -func Init(url string, prefetchChannelCount int) (err error) { +func Init(url string) (err error) { conn, err = amqp.Dial(url) if err != nil { return err } amqpChan, err = conn.Channel() - prefetchCount = prefetchChannelCount return err } +type ConsumerDefaultCallback struct { + Delivery func(amqp.Delivery) error +} + +func (c ConsumerDefaultCallback) Callback(msg amqp.Delivery) error { + return c.Delivery(msg) +} + func Close() error { err := amqpChan.Close() if err != nil { @@ -43,17 +71,42 @@ func (mc MessageChannel) GetMessage() amqp.Delivery { return <-mc } +func publish(exchange, queue string, body []byte) error { + return amqpChan.Publish(exchange, queue, false, false, amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: body, + }) +} + +// Queue + func (q Queue) Declare() error { _, err := amqpChan.QueueDeclare(string(q), true, false, false, false, nil) return err } func (q Queue) Publish(body []byte) error { - return amqpChan.Publish("", string(q), false, false, amqp.Publishing{ - DeliveryMode: amqp.Persistent, - ContentType: "text/plain", - Body: body, - }) + return publish("", string(q), body) +} + +// Exchange +func (e Exchange) Declare(kind string) error { + return amqpChan.ExchangeDeclare(string(e), kind, true, false, false, false, nil) +} + +func (e Exchange) Bind(queues []Queue) error { + for _, queue := range queues { + err := amqpChan.QueueBind(string(queue), "", string(e), false, nil) + if err != nil { + return err + } + } + return nil +} + +func (e Exchange) Publish(body []byte) error { + return publish(string(e), "", body) } func (q Queue) GetMessageChannel() MessageChannel { @@ -71,7 +124,7 @@ func (q Queue) GetMessageChannel() MessageChannel { } err = amqpChan.Qos( - prefetchCount, + 50, 0, true, ) @@ -82,7 +135,33 @@ func (q Queue) GetMessageChannel() MessageChannel { return messageChannel } -func (q Queue) RunConsumerWithCancel(consumer Consumer, async bool, ctx context.Context) { +func worker(messages <-chan amqp.Delivery, consumer Consumer) { + for msg := range messages { + err := consumer.Callback(msg) + if err != nil { + log.Error(err) + } + //if options.RetryOnError { + // if err := message.Nack(false, true); err != nil { + // log.Error(err) + // } + // time.Sleep(options.RetryDelay) + //} else { + // if err := message.Ack(false); err != nil { + // log.Error(err) + // } + //} + if err := msg.Ack(false); err != nil { + log.Error(err) + } + } +} + +func (q Queue) RunConsumer(consumer Consumer, options ConsumerOptions, ctx context.Context) { + messages := make(chan amqp.Delivery) + for w := 1; w <= options.Workers; w++ { + go worker(messages, consumer) + } messageChannel := q.GetMessageChannel() for { select { @@ -93,35 +172,18 @@ func (q Queue) RunConsumerWithCancel(consumer Consumer, async bool, ctx context. if message.Body == nil { continue } - if async { - go consumer(message) - } else { - consumer(message) - } + messages <- message } } } -func RestoreConnectionWorker(url string, queue Queue, timeout time.Duration) { - log.Info("Run MQ RestoreConnectionWorker") + +func QuitWorker(timeout time.Duration, quit chan<- os.Signal) { + log.Info("Run CancelWorker") for { if conn.IsClosed() { - for { - log.Warn("MQ is not available now") - log.Warn("Trying to connect to MQ...") - if err := Init(url, prefetchCount); err != nil { - log.Warn("MQ is still unavailable") - time.Sleep(timeout) - continue - } - if err := queue.Declare(); err != nil { - log.Warn("Can't declare queues:", queue) - time.Sleep(timeout) - continue - } else { - log.Info("MQ connection restored") - break - } - } + log.Error("MQ is not available now") + quit <- syscall.SIGTERM + return } time.Sleep(timeout) }