Skip to content

Commit

Permalink
docs and multiple exchanges
Browse files Browse the repository at this point in the history
  • Loading branch information
wagslane committed Mar 4, 2024
1 parent 937bab2 commit 28863cc
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 48 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# go-rabbitmq

Wrapper of [rabbitmq/amqp091-go](https://github.com/rabbitmq/amqp091-go) that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐
A wrapper of [rabbitmq/amqp091-go](https://github.com/rabbitmq/amqp091-go) that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐

Supported by [Boot.dev](https://boot.dev)

Expand Down Expand Up @@ -103,7 +103,7 @@ See the [examples](examples) directory for more ideas.

* By default, queues are declared if they didn't already exist by new consumers
* By default, routing-key bindings are declared by consumers if you're using `WithConsumerOptionsRoutingKey`
* By default, exchanges are *not* declared by publishers or consumers if they didn't already exist, hence `WithPublisherOptionsExchangeDeclare` and `WithConsumerOptionsExchangeDeclare`.
* By default, exchanges are *not* declared by publishers or consumers if they don't already exist, hence `WithPublisherOptionsExchangeDeclare` and `WithConsumerOptionsExchangeDeclare`.

Read up on all the options in the GoDoc, there are quite a few of them. I try to pick sane and simple defaults.

Expand Down
8 changes: 5 additions & 3 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,11 @@ func (consumer *Consumer) startGoroutines(
if err != nil {
return fmt.Errorf("declare qos failed: %w", err)
}
err = declareExchange(consumer.chanManager, options.ExchangeOptions)
if err != nil {
return fmt.Errorf("declare exchange failed: %w", err)
for _, exchangeOption := range options.ExchangeOptions {
err = declareExchange(consumer.chanManager, exchangeOption)
if err != nil {
return fmt.Errorf("declare exchange failed: %w", err)
}
}
err = declareQueue(consumer.chanManager, options.QueueOptions)
if err != nil {
Expand Down
86 changes: 57 additions & 29 deletions consumer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,26 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions {
Args: Table{},
Declare: true,
},
ExchangeOptions: ExchangeOptions{
Name: "",
Kind: amqp.ExchangeDirect,
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: false,
},
Bindings: []Binding{},
Concurrency: 1,
Logger: stdDebugLogger{},
QOSPrefetch: 10,
QOSGlobal: false,
ExchangeOptions: []ExchangeOptions{},
Concurrency: 1,
Logger: stdDebugLogger{},
QOSPrefetch: 10,
QOSGlobal: false,
}
}

func getDefaultExchangeOptions() ExchangeOptions {
return ExchangeOptions{
Name: "",
Kind: amqp.ExchangeDirect,
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: false,
Bindings: []Binding{},
}
}

Expand All @@ -60,8 +64,7 @@ func getDefaultBindingOptions() BindingOptions {
type ConsumerOptions struct {
RabbitConsumerOptions RabbitConsumerOptions
QueueOptions QueueOptions
ExchangeOptions ExchangeOptions
Bindings []Binding
ExchangeOptions []ExchangeOptions
Concurrency int
Logger logger.Logger
QOSPrefetch int
Expand Down Expand Up @@ -144,61 +147,77 @@ func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions) {
}
}

func ensureExchangeOptions(options *ConsumerOptions) {
if len(options.ExchangeOptions) == 0 {
options.ExchangeOptions = append(options.ExchangeOptions, getDefaultExchangeOptions())
}
}

// WithConsumerOptionsExchangeName sets the exchange name
func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ExchangeOptions.Name = name
ensureExchangeOptions(options)
options.ExchangeOptions[0].Name = name
}
}

// WithConsumerOptionsExchangeKind ensures the queue is a durable queue
func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ExchangeOptions.Kind = kind
ensureExchangeOptions(options)
options.ExchangeOptions[0].Kind = kind
}
}

// WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange
func WithConsumerOptionsExchangeDurable(options *ConsumerOptions) {
options.ExchangeOptions.Durable = true
ensureExchangeOptions(options)
options.ExchangeOptions[0].Durable = true
}

// WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange
func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions) {
options.ExchangeOptions.AutoDelete = true
ensureExchangeOptions(options)
options.ExchangeOptions[0].AutoDelete = true
}

// WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange
func WithConsumerOptionsExchangeInternal(options *ConsumerOptions) {
options.ExchangeOptions.Internal = true
ensureExchangeOptions(options)
options.ExchangeOptions[0].Internal = true
}

// WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange
func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions) {
options.ExchangeOptions.NoWait = true
ensureExchangeOptions(options)
options.ExchangeOptions[0].NoWait = true
}

// WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance
func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions) {
options.ExchangeOptions.Declare = true
ensureExchangeOptions(options)
options.ExchangeOptions[0].Declare = true
}

// WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange
func WithConsumerOptionsExchangePassive(options *ConsumerOptions) {
options.ExchangeOptions.Passive = true
ensureExchangeOptions(options)
options.ExchangeOptions[0].Passive = true
}

// WithConsumerOptionsExchangeArgs adds optional args to the exchange
func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ExchangeOptions.Args = args
ensureExchangeOptions(options)
options.ExchangeOptions[0].Args = args
}
}

// WithConsumerOptionsRoutingKey binds the queue to a routing key with the default binding options
func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Bindings = append(options.Bindings, Binding{
ensureExchangeOptions(options)
options.ExchangeOptions[0].Bindings = append(options.ExchangeOptions[0].Bindings, Binding{
RoutingKey: routingKey,
BindingOptions: getDefaultBindingOptions(),
})
Expand All @@ -210,7 +229,16 @@ func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions) {
// the zero value. If you want to declare your bindings for example, be sure to set Declare=true
func WithConsumerOptionsBinding(binding Binding) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Bindings = append(options.Bindings, binding)
ensureExchangeOptions(options)
options.ExchangeOptions[0].Bindings = append(options.ExchangeOptions[0].Bindings, binding)
}
}

// WithConsumerOptionsExchangeOptions adds a new exchange to the consumer, this should probably only be
// used if you want to to consume from multiple exchanges on the same consumer
func WithConsumerOptionsExchangeOptions(exchangeOptions ExchangeOptions) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ExchangeOptions = append(options.ExchangeOptions, exchangeOptions)
}
}

Expand Down
28 changes: 15 additions & 13 deletions declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,21 @@ func declareExchange(chanManager *channelmanager.ChannelManager, options Exchang
}

func declareBindings(chanManager *channelmanager.ChannelManager, options ConsumerOptions) error {
for _, binding := range options.Bindings {
if !binding.Declare {
continue
}
err := chanManager.QueueBindSafe(
options.QueueOptions.Name,
binding.RoutingKey,
options.ExchangeOptions.Name,
binding.NoWait,
tableToAMQPTable(binding.Args),
)
if err != nil {
return err
for _, exchangeOption := range options.ExchangeOptions {
for _, binding := range exchangeOption.Bindings {
if !binding.Declare {
continue
}
err := chanManager.QueueBindSafe(
options.QueueOptions.Name,
binding.RoutingKey,
exchangeOption.Name,
binding.NoWait,
tableToAMQPTable(binding.Args),
)
if err != nil {
return err
}
}
}
return nil
Expand Down
1 change: 1 addition & 0 deletions exchange_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ type ExchangeOptions struct {
Passive bool // if false, a missing exchange will be created on the server
Args Table
Declare bool
Bindings []Binding
}
2 changes: 1 addition & 1 deletion publisher_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func WithPublisherOptionsExchangeNoWait(options *PublisherOptions) {
options.ExchangeOptions.NoWait = true
}

// WithPublisherOptionsExchangeDeclare stops this library from declaring the exchanges existance
// WithPublisherOptionsExchangeDeclare will create the exchange if it doesn't exist
func WithPublisherOptionsExchangeDeclare(options *PublisherOptions) {
options.ExchangeOptions.Declare = true
}
Expand Down

0 comments on commit 28863cc

Please sign in to comment.