Skip to content

Commit

Permalink
Add cluster support via Resolver
Browse files Browse the repository at this point in the history
Signed-off-by: Ruslan Bayandinov <wazsone@ya.ru>
  • Loading branch information
wazsone committed Jun 3, 2024
1 parent eebea88 commit 5802dd2
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 12 deletions.
40 changes: 34 additions & 6 deletions connection.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package rabbitmq

import (
"math/rand"

amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/connectionmanager"
)
Expand All @@ -21,27 +23,53 @@ type Conn struct {
// will be stored in the returned connection's Config field.
type Config amqp.Config

type Resolver = connectionmanager.Resolver

type StaticResolver struct {
urls []string
shuffe bool
}

func (r *StaticResolver) Resolve() ([]string, error) {
// TODO: move to slices.Clone when supported Go versions > 1.21
var urls []string
urls = append(urls, r.urls...)

if r.shuffe {
rand.Shuffle(len(urls), func(i, j int) {
urls[i], urls[j] = urls[j], urls[i]
})
}
return urls, nil
}

func NewStaticResolver(urls []string, shuffle bool) *StaticResolver {
return &StaticResolver{urls: urls}
}

// NewConn creates a new connection manager
func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) {
func NewConn(url string, opts ...func(*ConnectionOptions)) (*Conn, error) {
return NewClusterConn(NewStaticResolver([]string{url}, false), opts...)
}

func NewClusterConn(resolver Resolver, opts ...func(*ConnectionOptions)) (*Conn, error) {
defaultOptions := getDefaultConnectionOptions()
options := &defaultOptions
for _, optionFunc := range optionFuncs {
optionFunc(options)
for _, optFn := range opts {
optFn(options)
}

manager, err := connectionmanager.NewConnectionManager(url, amqp.Config(options.Config), options.Logger, options.ReconnectInterval)
manager, err := connectionmanager.NewConnectionManager(resolver, amqp.Config(options.Config), options.Logger, options.ReconnectInterval)
if err != nil {
return nil, err
}

reconnectErrCh, closeCh := manager.NotifyReconnect()
conn := &Conn{
connectionManager: manager,
reconnectErrCh: reconnectErrCh,
closeConnectionToManagerCh: closeCh,
options: *options,
}

go conn.handleRestarts()
return conn, nil
}
Expand Down
25 changes: 25 additions & 0 deletions examples/cluster/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package main

import (
"log"

rabbitmq "github.com/wagslane/go-rabbitmq"
)

func main() {
resolver := rabbitmq.NewStaticResolver(
[]string{
"amqp://guest:guest@host1",
"amqp://guest:guest@host2",
"amqp://guest:guest@host3",
},
false, /* shuffle */
)

conn, err := rabbitmq.NewClusterConn(resolver)
if err != nil {
log.Fatal(err)
}
defer conn.Close()

}
40 changes: 34 additions & 6 deletions internal/connectionmanager/connection_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package connectionmanager

import (
"errors"
"fmt"
"sync"
"time"

Expand All @@ -12,7 +14,7 @@ import (
// ConnectionManager -
type ConnectionManager struct {
logger logger.Logger
url string
resolver Resolver
connection *amqp.Connection
amqpConfig amqp.Config
connectionMux *sync.RWMutex
Expand All @@ -22,15 +24,40 @@ type ConnectionManager struct {
dispatcher *dispatcher.Dispatcher
}

type Resolver interface {
Resolve() ([]string, error)
}

// dial will attempt to connect to the a list of urls in the order they are
// given.
func dial(log logger.Logger, resolver Resolver, conf amqp.Config) (*amqp.Connection, error) {
urls, err := resolver.Resolve()
if err != nil {
return nil, fmt.Errorf("error resolving amqp server urls: %w", err)
}

var errs []error
for _, url := range urls {
conn, err := amqp.DialConfig(url, amqp.Config(conf))
if err == nil {
return conn, err
}
log.Warnf("failed to connect to amqp server %s: %v", url, err)
errs = append(errs, err)
}
return nil, errors.Join(errs...)
}

// NewConnectionManager creates a new connection manager
func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) {
conn, err := amqp.DialConfig(url, amqp.Config(conf))
func NewConnectionManager(resolver Resolver, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) {
conn, err := dial(log, resolver, amqp.Config(conf))
if err != nil {
return nil, err
}

connManager := ConnectionManager{
logger: log,
url: url,
resolver: resolver,
connection: conn,
amqpConfig: conf,
connectionMux: &sync.RWMutex{},
Expand Down Expand Up @@ -125,7 +152,8 @@ func (connManager *ConnectionManager) reconnectLoop() {
func (connManager *ConnectionManager) reconnect() error {
connManager.connectionMux.Lock()
defer connManager.connectionMux.Unlock()
newConn, err := amqp.DialConfig(connManager.url, amqp.Config(connManager.amqpConfig))

conn, err := dial(connManager.logger, connManager.resolver, amqp.Config(connManager.amqpConfig))
if err != nil {
return err
}
Expand All @@ -134,6 +162,6 @@ func (connManager *ConnectionManager) reconnect() error {
connManager.logger.Warnf("error closing connection while reconnecting: %v", err)
}

connManager.connection = newConn
connManager.connection = conn
return nil
}

0 comments on commit 5802dd2

Please sign in to comment.