Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Helpful functions #127

Closed
wants to merge 13 commits into from
Closed
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Wrapper of [rabbitmq/amqp091-go](https://github.com/rabbitmq/amqp091-go) that pr

Supported by [Boot.dev](https://boot.dev)

[![](https://godoc.org/github.com/wagslane/go-rabbitmq?status.svg)](https://godoc.org/github.com/wagslane/go-rabbitmq)![Deploy](https://github.com/wagslane/go-rabbitmq/workflows/Tests/badge.svg)
[![](https://godoc.org/github.com/DizoftTeam/go-rabbitmq?status.svg)](https://godoc.org/github.com/DizoftTeam/go-rabbitmq)![Deploy](https://github.com/DizoftTeam/go-rabbitmq/workflows/Tests/badge.svg)

## Motivation

Expand All @@ -25,7 +25,7 @@ The goal with `go-rabbitmq` is to provide *most* (but not all) of the nitty-grit
Inside a Go module:

```bash
go get github.com/wagslane/go-rabbitmq
go get github.com/DizoftTeam/go-rabbitmq
```

## 🚀 Quick Start Consumer
Expand Down
34 changes: 32 additions & 2 deletions connection.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package rabbitmq

import (
"sync"

"github.com/DizoftTeam/go-rabbitmq/internal/connectionmanager"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/connectionmanager"
)

// Conn manages the connection to a rabbit cluster
Expand All @@ -12,6 +14,10 @@ type Conn struct {
reconnectErrCh <-chan error
closeConnectionToManagerCh chan<- struct{}

reconnectHooks []func(error)
looseConnectionCh <-chan error
mutex *sync.RWMutex

options ConnectionOptions
}

Expand All @@ -34,24 +40,48 @@ func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error)
return nil, err
}

reconnectErrCh, closeCh := manager.NotifyReconnect()
reconnectErrCh, closeCh, looseConnectionCh := manager.NotifyReconnect()

conn := &Conn{
connectionManager: manager,
reconnectErrCh: reconnectErrCh,
closeConnectionToManagerCh: closeCh,
options: *options,
looseConnectionCh: looseConnectionCh,

mutex: &sync.RWMutex{},
}

go conn.handleLooseConnection()
go conn.handleRestarts()

return conn, nil
}

func (conn *Conn) handleLooseConnection() {
for err := range conn.looseConnectionCh {
conn.mutex.Lock()

for _, fhook := range conn.reconnectHooks {
fhook(err)
}

conn.mutex.Unlock()
}
}

func (conn *Conn) handleRestarts() {
for err := range conn.reconnectErrCh {
conn.options.Logger.Infof("successful connection recovery from: %v", err)
}
}

func (conn *Conn) RegisterReconnectHook(hook func(error)) {
conn.mutex.Lock()
conn.reconnectHooks = append(conn.reconnectHooks, hook)
conn.mutex.Unlock()
}

// Close closes the connection, it's not safe for re-use.
// You should also close any consumers and publishers before
// closing the connection
Expand Down
38 changes: 25 additions & 13 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"sync"

"github.com/DizoftTeam/go-rabbitmq/internal/channelmanager"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/channelmanager"
)

// Action is an action that occurs after processed this delivery
Expand All @@ -31,6 +31,7 @@ type Consumer struct {
chanManager *channelmanager.ChannelManager
reconnectErrCh <-chan error
closeConnectionToManagerCh chan<- struct{}
notifyClosedChan <-chan error
options ConsumerOptions

isClosedMux *sync.RWMutex
Expand Down Expand Up @@ -67,12 +68,13 @@ func NewConsumer(
if err != nil {
return nil, err
}
reconnectErrCh, closeCh := chanManager.NotifyReconnect()
reconnectErrCh, closeCh, notifyClosedChan := chanManager.NotifyReconnect()

consumer := &Consumer{
chanManager: chanManager,
reconnectErrCh: reconnectErrCh,
closeConnectionToManagerCh: closeCh,
notifyClosedChan: notifyClosedChan,
options: *options,
isClosedMux: &sync.RWMutex{},
isClosed: false,
Expand All @@ -82,20 +84,24 @@ func NewConsumer(
handler,
*options,
)

if err != nil {
return nil, err
}

go func() {
for err := range consumer.reconnectErrCh {
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)

err = consumer.startGoroutines(
handler,
*options,
)

if err != nil {
consumer.options.Logger.Fatalf("error restarting consumer goroutines after cancel or close: %v", err)
consumer.options.Logger.Fatalf("consumer closing, unable to recover")

return
}
}
Expand All @@ -111,18 +117,18 @@ func NewConsumer(
func (consumer *Consumer) Close() {
consumer.isClosedMux.Lock()
defer consumer.isClosedMux.Unlock()

consumer.isClosed = true

// close the channel so that rabbitmq server knows that the
// consumer has been stopped.
err := consumer.chanManager.Close()
if err != nil {
if err := consumer.chanManager.Close(); err != nil {
consumer.options.Logger.Warnf("error while closing the channel: %v", err)
}

consumer.options.Logger.Infof("closing consumer...")
go func() {
consumer.closeConnectionToManagerCh <- struct{}{}
}()

close(consumer.closeConnectionToManagerCh)
}

// startGoroutines declares the queue if it doesn't exist,
Expand All @@ -137,19 +143,20 @@ func (consumer *Consumer) startGoroutines(
0,
options.QOSGlobal,
)

if err != nil {
return fmt.Errorf("declare qos failed: %w", err)
}
err = declareExchange(consumer.chanManager, options.ExchangeOptions)
if err != nil {

if err = declareExchange(consumer.chanManager, options.ExchangeOptions); err != nil {
return fmt.Errorf("declare exchange failed: %w", err)
}
err = declareQueue(consumer.chanManager, options.QueueOptions)
if err != nil {

if err = declareQueue(consumer.chanManager, options.QueueOptions); err != nil {
return fmt.Errorf("declare queue failed: %w", err)
}
err = declareBindings(consumer.chanManager, options)
if err != nil {

if err = declareBindings(consumer.chanManager, options); err != nil {
return fmt.Errorf("declare bindings failed: %w", err)
}

Expand All @@ -162,20 +169,24 @@ func (consumer *Consumer) startGoroutines(
options.RabbitConsumerOptions.NoWait,
tableToAMQPTable(options.RabbitConsumerOptions.Args),
)

if err != nil {
return err
}

for i := 0; i < options.Concurrency; i++ {
go handlerGoroutine(consumer, msgs, options, handler)
}

consumer.options.Logger.Infof("Processing messages on %v goroutines", options.Concurrency)

return nil
}

func (consumer *Consumer) getIsClosed() bool {
consumer.isClosedMux.RLock()
defer consumer.isClosedMux.RUnlock()

return consumer.isClosed
}

Expand Down Expand Up @@ -208,5 +219,6 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti
}
}
}

consumer.options.Logger.Infof("rabbit consumer goroutine closed")
}
2 changes: 1 addition & 1 deletion consumer_options.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package rabbitmq

import (
"github.com/DizoftTeam/go-rabbitmq/internal/logger"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/logger"
)

// getDefaultConsumerOptions describes the options that will be used when a value isn't provided
Expand Down
70 changes: 69 additions & 1 deletion declare.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,74 @@
package rabbitmq

import (
"github.com/wagslane/go-rabbitmq/internal/channelmanager"
"errors"

"github.com/DizoftTeam/go-rabbitmq/internal/channelmanager"
)

type Declarator struct {
chanManager *channelmanager.ChannelManager
}

func NewDeclarator(conn *Conn) (*Declarator, error) {
if conn.connectionManager == nil {
return nil, errors.New("connection manager can't be nil")
}

chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, &stdDebugLogger{}, conn.connectionManager.ReconnectInterval)
if err != nil {
return nil, err
}

result := &Declarator{
chanManager: chanManager,
}

return result, nil
}

func (d *Declarator) Close() {
d.chanManager.Close()
}

func (d *Declarator) DeclareExchange(optionFuncs ...func(*PublisherOptions)) error {
defaultOptions := getDefaultPublisherOptions()
options := &defaultOptions
for _, optionFunc := range optionFuncs {
optionFunc(options)
}

return declareExchange(d.chanManager, options.ExchangeOptions)
}

func (d *Declarator) DeclareQueue(queue string, optionFuncs ...func(*ConsumerOptions)) error {
defaultOptions := getDefaultConsumerOptions(queue)
options := &defaultOptions
for _, optionFunc := range optionFuncs {
optionFunc(options)
}

return declareQueue(d.chanManager, options.QueueOptions)
}

func (d *Declarator) BindExchanges(bindings []ExchangeBinding) error {
for _, binding := range bindings {
err := d.chanManager.ExchangeBindSafe(
binding.From,
binding.RoutingKey,
binding.To,
binding.NoWait,
tableToAMQPTable(binding.Args),
)

if err != nil {
return err
}
}

return nil
}

func declareQueue(chanManager *channelmanager.ChannelManager, options QueueOptions) error {
if !options.Declare {
return nil
Expand Down Expand Up @@ -75,16 +140,19 @@ func declareBindings(chanManager *channelmanager.ChannelManager, options Consume
if !binding.Declare {
continue
}

err := chanManager.QueueBindSafe(
options.QueueOptions.Name,
binding.RoutingKey,
options.ExchangeOptions.Name,
binding.NoWait,
tableToAMQPTable(binding.Args),
)

if err != nil {
return err
}
}

return nil
}
9 changes: 9 additions & 0 deletions declate_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package rabbitmq

type ExchangeBinding struct {
From string
To string
RoutingKey string
Args Table
NoWait bool
}
2 changes: 1 addition & 1 deletion examples/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os/signal"
"syscall"

rabbitmq "github.com/wagslane/go-rabbitmq"
rabbitmq "github.com/DizoftTeam/go-rabbitmq"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion examples/logger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"log"

rabbitmq "github.com/wagslane/go-rabbitmq"
rabbitmq "github.com/DizoftTeam/go-rabbitmq"
)

// errorLogger is used in WithPublisherOptionsLogger to create a custom logger
Expand Down
2 changes: 1 addition & 1 deletion examples/multiconsumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os/signal"
"syscall"

rabbitmq "github.com/wagslane/go-rabbitmq"
rabbitmq "github.com/DizoftTeam/go-rabbitmq"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion examples/multipublisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"syscall"
"time"

rabbitmq "github.com/wagslane/go-rabbitmq"
rabbitmq "github.com/DizoftTeam/go-rabbitmq"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion examples/publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"syscall"
"time"

rabbitmq "github.com/wagslane/go-rabbitmq"
rabbitmq "github.com/DizoftTeam/go-rabbitmq"
)

func main() {
Expand Down
Loading
Loading