Skip to content

Commit

Permalink
Shared mq package functions (#22)
Browse files Browse the repository at this point in the history
* Simplify mq

* Add QuitWorker

* Update mq.go

* Update mq.go

* Update mq.go

* Update mq.go

* Update mq.go

* Update mq.go

* Add Bind queue

* Update mq.go

* Add message reject

* Update mq.go

* Remove Nack

* Add ConsumerOptions

* Update mq.go

* Update mq.go

* Update mq.go

* Update mq.go

* Update mq.go

* Fix callback

* Update mq.go

* Add SetupGracefulShutdown

* Add SetupGracefulShutdownForTimeout

* Rename to SetupGracefulShutdown

* Add StacktraceConfiguration
  • Loading branch information
vikmeup authored Jan 5, 2021
1 parent 78c950f commit 03598d9
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 37 deletions.
43 changes: 43 additions & 0 deletions network/gin/setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package gin

import (
"context"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
)

func SetupGracefulShutdown(ctx context.Context, port string, engine *gin.Engine) {
server := &http.Server{
Addr: ":" + port,
Handler: engine,
}

defer func() {
if err := server.Shutdown(ctx); err != nil {
log.Info("Server Shutdown: ", err)
}
}()

signalForExit := make(chan os.Signal, 1)
signal.Notify(signalForExit,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

go func() {
if err := server.ListenAndServe(); err != nil {
log.Fatal("Application failed", err)
}
}()
log.WithFields(log.Fields{"bind": port}).Info("Running application")

stop := <-signalForExit
log.Info("Stop signal Received", stop)
log.Info("Waiting for all jobs to stop")
}
3 changes: 3 additions & 0 deletions network/middleware/sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ func SetupSentry(dsn string) error {
return err
}
hook.StacktraceConfiguration.Enable = true
hook.StacktraceConfiguration.IncludeErrorBreadcrumb = true
hook.StacktraceConfiguration.Context = 10
hook.StacktraceConfiguration.SendExceptionType = true
log.AddHook(hook)
return nil
}
19 changes: 19 additions & 0 deletions network/middleware/shutdown.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package middleware

import (
"os"
"os/signal"
"syscall"
"time"

log "github.com/sirupsen/logrus"
)

func SetupGracefulShutdown(timeout time.Duration) {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Info("Shutdown timeout: ...", timeout)
time.Sleep(timeout)
log.Info("Exiting gracefully")
}
136 changes: 99 additions & 37 deletions network/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,62 @@ package mq

import (
"context"
log "github.com/sirupsen/logrus"
"os"
"syscall"
"time"

log "github.com/sirupsen/logrus"

"github.com/streadway/amqp"
)

var (
prefetchCount int
amqpChan *amqp.Channel
conn *amqp.Connection

DefaultConsumerOptions = ConsumerOptions{
Workers: 1,
RetryOnError: false,
RetryDelay: 0,
}
)

const ()

type Consumer interface {
Callback(msg amqp.Delivery) error
}

type ConsumerOptions struct {
Workers int
RetryOnError bool
RetryDelay time.Duration
}

type (
Queue string
Consumer func(amqp.Delivery)
MessageChannel <-chan amqp.Delivery
Queue string
Exchange string
MessageChannel <-chan amqp.Delivery
)

func Init(url string, prefetchChannelCount int) (err error) {
func Init(url string) (err error) {
conn, err = amqp.Dial(url)
if err != nil {
return err
}
amqpChan, err = conn.Channel()
prefetchCount = prefetchChannelCount
return err
}

type ConsumerDefaultCallback struct {
Delivery func(amqp.Delivery) error
}

func (c ConsumerDefaultCallback) Callback(msg amqp.Delivery) error {
return c.Delivery(msg)
}

func Close() error {
err := amqpChan.Close()
if err != nil {
Expand All @@ -43,17 +71,42 @@ func (mc MessageChannel) GetMessage() amqp.Delivery {
return <-mc
}

func publish(exchange, queue string, body []byte) error {
return amqpChan.Publish(exchange, queue, false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: body,
})
}

// Queue

func (q Queue) Declare() error {
_, err := amqpChan.QueueDeclare(string(q), true, false, false, false, nil)
return err
}

func (q Queue) Publish(body []byte) error {
return amqpChan.Publish("", string(q), false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: body,
})
return publish("", string(q), body)
}

// Exchange
func (e Exchange) Declare(kind string) error {
return amqpChan.ExchangeDeclare(string(e), kind, true, false, false, false, nil)
}

func (e Exchange) Bind(queues []Queue) error {
for _, queue := range queues {
err := amqpChan.QueueBind(string(queue), "", string(e), false, nil)
if err != nil {
return err
}
}
return nil
}

func (e Exchange) Publish(body []byte) error {
return publish(string(e), "", body)
}

func (q Queue) GetMessageChannel() MessageChannel {
Expand All @@ -71,7 +124,7 @@ func (q Queue) GetMessageChannel() MessageChannel {
}

err = amqpChan.Qos(
prefetchCount,
50,
0,
true,
)
Expand All @@ -82,7 +135,33 @@ func (q Queue) GetMessageChannel() MessageChannel {
return messageChannel
}

func (q Queue) RunConsumerWithCancel(consumer Consumer, async bool, ctx context.Context) {
func worker(messages <-chan amqp.Delivery, consumer Consumer) {
for msg := range messages {
err := consumer.Callback(msg)
if err != nil {
log.Error(err)
}
//if options.RetryOnError {
// if err := message.Nack(false, true); err != nil {
// log.Error(err)
// }
// time.Sleep(options.RetryDelay)
//} else {
// if err := message.Ack(false); err != nil {
// log.Error(err)
// }
//}
if err := msg.Ack(false); err != nil {
log.Error(err)
}
}
}

func (q Queue) RunConsumer(consumer Consumer, options ConsumerOptions, ctx context.Context) {
messages := make(chan amqp.Delivery)
for w := 1; w <= options.Workers; w++ {
go worker(messages, consumer)
}
messageChannel := q.GetMessageChannel()
for {
select {
Expand All @@ -93,35 +172,18 @@ func (q Queue) RunConsumerWithCancel(consumer Consumer, async bool, ctx context.
if message.Body == nil {
continue
}
if async {
go consumer(message)
} else {
consumer(message)
}
messages <- message
}
}
}
func RestoreConnectionWorker(url string, queue Queue, timeout time.Duration) {
log.Info("Run MQ RestoreConnectionWorker")

func QuitWorker(timeout time.Duration, quit chan<- os.Signal) {
log.Info("Run CancelWorker")
for {
if conn.IsClosed() {
for {
log.Warn("MQ is not available now")
log.Warn("Trying to connect to MQ...")
if err := Init(url, prefetchCount); err != nil {
log.Warn("MQ is still unavailable")
time.Sleep(timeout)
continue
}
if err := queue.Declare(); err != nil {
log.Warn("Can't declare queues:", queue)
time.Sleep(timeout)
continue
} else {
log.Info("MQ connection restored")
break
}
}
log.Error("MQ is not available now")
quit <- syscall.SIGTERM
return
}
time.Sleep(timeout)
}
Expand Down

0 comments on commit 03598d9

Please sign in to comment.