Skip to content

Commit

Permalink
[#35]: feat: pass amqp.Table as an arg to the QueueDeclare method
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Dec 21, 2022
2 parents 66f4a45 + 566f222 commit 57e04bc
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 26 deletions.
18 changes: 12 additions & 6 deletions amqpjobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
dlxTTL string = "x-message-ttl"
dlxExpires string = "x-expires"

// new in 2.12.2
queueHeaders string = "queue_headers"

contentType string = "application/octet-stream"
)

Expand All @@ -41,19 +44,22 @@ type config struct {
Exchange string `mapstructure:"exchange"`
ExchangeType string `mapstructure:"exchange_type"`

// new in 2.12
ExchangeDurable bool `mapstructure:"exchange_durable"`
ExchangeAutoDelete bool `mapstructure:"exchange_auto_delete"`
QueueAutoDelete bool `mapstructure:"queue_auto_delete"`
RedialTimeout int `mapstructure:"redial_timeout"`

RoutingKey string `mapstructure:"routing_key"`
ConsumeAll bool `mapstructure:"consume_all"`
Exclusive bool `mapstructure:"exclusive"`
Durable bool `mapstructure:"durable"`
DeleteQueueOnStop bool `mapstructure:"delete_queue_on_stop"`
MultipleAck bool `mapstructure:"multiple_ask"`
RequeueOnFail bool `mapstructure:"requeue_on_fail"`

// new in 2.12.1
ExchangeDurable bool `mapstructure:"exchange_durable"`
ExchangeAutoDelete bool `mapstructure:"exchange_auto_delete"`
QueueAutoDelete bool `mapstructure:"queue_auto_delete"`
RedialTimeout int `mapstructure:"redial_timeout"`

// new in 2.12.2
QueueHeaders map[string]any `mapstructure:"queue_headers"`
}

func (c *config) InitDefault() {
Expand Down
39 changes: 36 additions & 3 deletions amqpjobs/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/roadrunner-server/errors"
Expand Down Expand Up @@ -43,7 +44,7 @@ type Consumer struct {
notifyClosePubCh chan *amqp.Error
notifyCloseConsumeCh chan *amqp.Error
notifyCloseStatCh chan *amqp.Error
redialCh chan *amqp.Error
redialCh chan *redialMsg

conn *amqp.Connection
consumeChan *amqp.Channel
Expand All @@ -70,6 +71,9 @@ type Consumer struct {
exchangeAutoDelete bool
queueAutoDelete bool

// new in 2.12.2
queueHeaders map[string]any

listeners uint32
delayed *int64
stopCh chan struct{}
Expand Down Expand Up @@ -119,7 +123,7 @@ func NewAMQPConsumer(configKey string, log *zap.Logger, cfg Configurer, pq prior

publishChan: make(chan *amqp.Channel, 1),
stateChan: make(chan *amqp.Channel, 1),
redialCh: make(chan *amqp.Error, 5),
redialCh: make(chan *redialMsg, 5),

notifyCloseConsumeCh: make(chan *amqp.Error, 1),
notifyCloseConnCh: make(chan *amqp.Error, 1),
Expand All @@ -142,6 +146,8 @@ func NewAMQPConsumer(configKey string, log *zap.Logger, cfg Configurer, pq prior
exchangeAutoDelete: conf.ExchangeAutoDelete,
exchangeDurable: conf.ExchangeDurable,
queueAutoDelete: conf.QueueAutoDelete,
// 2.12.2
queueHeaders: conf.QueueHeaders,
}

jb.conn, err = amqp.Dial(conf.Addr)
Expand Down Expand Up @@ -216,7 +222,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log *zap.Logger, cfg Configurer,
publishChan: make(chan *amqp.Channel, 1),
stateChan: make(chan *amqp.Channel, 1),

redialCh: make(chan *amqp.Error, 5),
redialCh: make(chan *redialMsg, 5),
notifyCloseConsumeCh: make(chan *amqp.Error, 1),
notifyCloseConnCh: make(chan *amqp.Error, 1),
notifyCloseStatCh: make(chan *amqp.Error, 1),
Expand All @@ -240,6 +246,21 @@ func FromPipeline(pipeline *pipeline.Pipeline, log *zap.Logger, cfg Configurer,
exchangeAutoDelete: pipeline.Bool(exchangeAutoDelete, false),
exchangeDurable: pipeline.Bool(exchangeDurable, false),
queueAutoDelete: pipeline.Bool(queueAutoDelete, false),

// 2.12.2
queueHeaders: nil,
}

v := pipeline.String(queueHeaders, "")
if v != "" {
var tp map[string]any
err = json.Unmarshal([]byte(v), &tp)
if err != nil {
log.Warn("failed to unmarshal headers", zap.String("value", v))
return nil, err
}

jb.queueHeaders = tp
}

jb.conn, err = amqp.Dial(conf.Addr)
Expand Down Expand Up @@ -320,7 +341,13 @@ func (c *Consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
c.mu.Lock()
defer c.mu.Unlock()

// declare/bind/check the queue
var err error
err = c.declareQueue()
if err != nil {
return err
}

c.consumeChan, err = c.conn.Channel()
if err != nil {
return errors.E(op, err)
Expand Down Expand Up @@ -437,6 +464,12 @@ func (c *Consumer) Resume(_ context.Context, p string) {
}

var err error
err = c.declareQueue()
if err != nil {
c.log.Error("unable to start listener", zap.Error(err))
return
}

c.consumeChan, err = c.conn.Channel()
if err != nil {
c.log.Error("create channel", zap.Error(err))
Expand Down
7 changes: 4 additions & 3 deletions amqpjobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,10 @@ func (c *Consumer) fromDelivery(d amqp.Delivery) (*Item, error) {
Payload: utils.AsString(d.Body),
Headers: convHeaders(d.Headers),
Options: &Options{
Priority: 10,
Delay: 0,
Pipeline: auto,
Priority: 10,
Delay: 0,
// in case of `deduced_by_rr` type of the JOB, we're sending a queue name
Pipeline: c.queue,
ack: d.Ack,
nack: d.Nack,
requeueFn: c.handleItem,
Expand Down
12 changes: 11 additions & 1 deletion amqpjobs/rabbit_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,24 @@ func (c *Consumer) initRabbitMQ() error {
return errors.E(op, err)
}

return channel.Close()
}

func (c *Consumer) declareQueue() error {
const op = errors.Op("jobs_plugin_rmq_queue_declare")
channel, err := c.conn.Channel()
if err != nil {
return errors.E(op, err)
}

// verify or declare a queue
q, err := channel.QueueDeclare(
c.queue,
c.durable,
c.queueAutoDelete,
c.exclusive,
false,
nil,
c.queueHeaders,
)
if err != nil {
return errors.E(op, err)
Expand Down
61 changes: 48 additions & 13 deletions amqpjobs/redial.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ import (
"go.uber.org/zap"
)

const (
ConnCloseType string = "connection"
ConsumeCloseType string = "consume"
PublishCloseType string = "publish"
StatCloseType string = "stat"
)

type redialMsg struct {
t string
err *amqp.Error
}

// redialer used to redial to the rabbitmq in case of the connection interrupts
func (c *Consumer) redialer() { //nolint:gocognit,gocyclo
go func() {
Expand All @@ -28,7 +40,10 @@ func (c *Consumer) redialer() { //nolint:gocognit,gocyclo
}

select {
case c.redialCh <- err:
case c.redialCh <- &redialMsg{
t: ConnCloseType,
err: err,
}:
c.log.Debug("exited from redialer")
return
default:
Expand All @@ -49,13 +64,17 @@ func (c *Consumer) redialer() { //nolint:gocognit,gocyclo
}

select {
case c.redialCh <- err:
case c.redialCh <- &redialMsg{
t: ConsumeCloseType,
err: err,
}:
c.log.Debug("exited from redialer")
return
default:
c.log.Debug("exited from redialer")
return
}

case err := <-c.notifyClosePubCh:
if err == nil {
c.log.Debug("exited from redialer")
Expand All @@ -69,13 +88,17 @@ func (c *Consumer) redialer() { //nolint:gocognit,gocyclo
}

select {
case c.redialCh <- err:
case c.redialCh <- &redialMsg{
t: PublishCloseType,
err: err,
}:
c.log.Debug("exited from redialer")
return
default:
c.log.Debug("exited from redialer")
return
}

case err := <-c.notifyCloseStatCh:
if err == nil {
c.log.Debug("redialer stopped")
Expand All @@ -89,7 +112,10 @@ func (c *Consumer) redialer() { //nolint:gocognit,gocyclo
}

select {
case c.redialCh <- err:
case c.redialCh <- &redialMsg{
t: StatCloseType,
err: err,
}:
c.log.Debug("redialer stopped")
return
default:
Expand Down Expand Up @@ -170,23 +196,23 @@ func (c *Consumer) reset() {

func (c *Consumer) redialMergeCh() {
go func() {
for err := range c.redialCh {
for rm := range c.redialCh {
c.mu.Lock()
c.redial(err)
c.redial(rm)
c.mu.Unlock()
}
}()
}

func (c *Consumer) redial(amqpErr *amqp.Error) {
func (c *Consumer) redial(rm *redialMsg) {
const op = errors.Op("rabbitmq_redial")
// trash the broken publishing channel
c.reset()

t := time.Now().UTC()
pipe := c.pipeline.Load()

c.log.Error("pipeline connection was closed, redialing", zap.Error(amqpErr), zap.String("pipeline", pipe.Name()), zap.String("driver", pipe.Driver()), zap.Time("start", t))
c.log.Error("pipeline connection was closed, redialing", zap.Error(rm.err), zap.String("pipeline", pipe.Name()), zap.String("driver", pipe.Driver()), zap.Time("start", t))

expb := backoff.NewExponentialBackOff()
// set the retry timeout (minutes)
Expand Down Expand Up @@ -247,20 +273,29 @@ func (c *Consumer) redial(amqpErr *amqp.Error) {
c.notifyClosePubCh = make(chan *amqp.Error, 1)
c.notifyCloseStatCh = make(chan *amqp.Error, 1)
c.notifyCloseConnCh = make(chan *amqp.Error, 1)
c.notifyCloseConsumeCh = make(chan *amqp.Error, 1)

c.conn.NotifyClose(c.notifyCloseConnCh)
c.consumeChan.NotifyClose(c.notifyCloseConsumeCh)
pch.NotifyClose(c.notifyClosePubCh)
sch.NotifyClose(c.notifyCloseStatCh)

// put the fresh channels
c.stateChan <- sch
c.publishChan <- pch

// restart listener
atomic.StoreUint32(&c.listeners, 1)
c.listener(deliv)
// we should restore the listener only when we previously had an active listener
// OR if we get a Consume Closed type of the error
if atomic.LoadUint32(&c.listeners) == 1 || rm.t == ConsumeCloseType {
c.notifyCloseConsumeCh = make(chan *amqp.Error, 1)
c.consumeChan.NotifyClose(c.notifyCloseConsumeCh)
// restart listener
err = c.declareQueue()
if err != nil {
return err
}

atomic.StoreUint32(&c.listeners, 1)
c.listener(deliv)
}

c.log.Info("queues and subscribers was redeclared successfully")

Expand Down

0 comments on commit 57e04bc

Please sign in to comment.