From 5802dd228698558622f87ae5bfe681c35dc9f4b0 Mon Sep 17 00:00:00 2001 From: Ruslan Bayandinov Date: Mon, 3 Jun 2024 12:50:20 +0700 Subject: [PATCH] Add cluster support via Resolver Signed-off-by: Ruslan Bayandinov --- connection.go | 40 ++++++++++++++++--- examples/cluster/main.go | 25 ++++++++++++ .../connectionmanager/connection_manager.go | 40 ++++++++++++++++--- 3 files changed, 93 insertions(+), 12 deletions(-) create mode 100644 examples/cluster/main.go diff --git a/connection.go b/connection.go index 97b8bb4..0b9c5a8 100644 --- a/connection.go +++ b/connection.go @@ -1,6 +1,8 @@ package rabbitmq import ( + "math/rand" + amqp "github.com/rabbitmq/amqp091-go" "github.com/wagslane/go-rabbitmq/internal/connectionmanager" ) @@ -21,19 +23,46 @@ type Conn struct { // will be stored in the returned connection's Config field. type Config amqp.Config +type Resolver = connectionmanager.Resolver + +type StaticResolver struct { + urls []string + shuffe bool +} + +func (r *StaticResolver) Resolve() ([]string, error) { + // TODO: move to slices.Clone when supported Go versions > 1.21 + var urls []string + urls = append(urls, r.urls...) + + if r.shuffe { + rand.Shuffle(len(urls), func(i, j int) { + urls[i], urls[j] = urls[j], urls[i] + }) + } + return urls, nil +} + +func NewStaticResolver(urls []string, shuffle bool) *StaticResolver { + return &StaticResolver{urls: urls} +} + // NewConn creates a new connection manager -func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) { +func NewConn(url string, opts ...func(*ConnectionOptions)) (*Conn, error) { + return NewClusterConn(NewStaticResolver([]string{url}, false), opts...) +} + +func NewClusterConn(resolver Resolver, opts ...func(*ConnectionOptions)) (*Conn, error) { defaultOptions := getDefaultConnectionOptions() options := &defaultOptions - for _, optionFunc := range optionFuncs { - optionFunc(options) + for _, optFn := range opts { + optFn(options) } - manager, err := connectionmanager.NewConnectionManager(url, amqp.Config(options.Config), options.Logger, options.ReconnectInterval) + manager, err := connectionmanager.NewConnectionManager(resolver, amqp.Config(options.Config), options.Logger, options.ReconnectInterval) if err != nil { return nil, err } - reconnectErrCh, closeCh := manager.NotifyReconnect() conn := &Conn{ connectionManager: manager, @@ -41,7 +70,6 @@ func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) closeConnectionToManagerCh: closeCh, options: *options, } - go conn.handleRestarts() return conn, nil } diff --git a/examples/cluster/main.go b/examples/cluster/main.go new file mode 100644 index 0000000..24513c3 --- /dev/null +++ b/examples/cluster/main.go @@ -0,0 +1,25 @@ +package main + +import ( + "log" + + rabbitmq "github.com/wagslane/go-rabbitmq" +) + +func main() { + resolver := rabbitmq.NewStaticResolver( + []string{ + "amqp://guest:guest@host1", + "amqp://guest:guest@host2", + "amqp://guest:guest@host3", + }, + false, /* shuffle */ + ) + + conn, err := rabbitmq.NewClusterConn(resolver) + if err != nil { + log.Fatal(err) + } + defer conn.Close() + +} diff --git a/internal/connectionmanager/connection_manager.go b/internal/connectionmanager/connection_manager.go index fce1f2b..541c57f 100644 --- a/internal/connectionmanager/connection_manager.go +++ b/internal/connectionmanager/connection_manager.go @@ -1,6 +1,8 @@ package connectionmanager import ( + "errors" + "fmt" "sync" "time" @@ -12,7 +14,7 @@ import ( // ConnectionManager - type ConnectionManager struct { logger logger.Logger - url string + resolver Resolver connection *amqp.Connection amqpConfig amqp.Config connectionMux *sync.RWMutex @@ -22,15 +24,40 @@ type ConnectionManager struct { dispatcher *dispatcher.Dispatcher } +type Resolver interface { + Resolve() ([]string, error) +} + +// dial will attempt to connect to the a list of urls in the order they are +// given. +func dial(log logger.Logger, resolver Resolver, conf amqp.Config) (*amqp.Connection, error) { + urls, err := resolver.Resolve() + if err != nil { + return nil, fmt.Errorf("error resolving amqp server urls: %w", err) + } + + var errs []error + for _, url := range urls { + conn, err := amqp.DialConfig(url, amqp.Config(conf)) + if err == nil { + return conn, err + } + log.Warnf("failed to connect to amqp server %s: %v", url, err) + errs = append(errs, err) + } + return nil, errors.Join(errs...) +} + // NewConnectionManager creates a new connection manager -func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) { - conn, err := amqp.DialConfig(url, amqp.Config(conf)) +func NewConnectionManager(resolver Resolver, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) { + conn, err := dial(log, resolver, amqp.Config(conf)) if err != nil { return nil, err } + connManager := ConnectionManager{ logger: log, - url: url, + resolver: resolver, connection: conn, amqpConfig: conf, connectionMux: &sync.RWMutex{}, @@ -125,7 +152,8 @@ func (connManager *ConnectionManager) reconnectLoop() { func (connManager *ConnectionManager) reconnect() error { connManager.connectionMux.Lock() defer connManager.connectionMux.Unlock() - newConn, err := amqp.DialConfig(connManager.url, amqp.Config(connManager.amqpConfig)) + + conn, err := dial(connManager.logger, connManager.resolver, amqp.Config(connManager.amqpConfig)) if err != nil { return err } @@ -134,6 +162,6 @@ func (connManager *ConnectionManager) reconnect() error { connManager.logger.Warnf("error closing connection while reconnecting: %v", err) } - connManager.connection = newConn + connManager.connection = conn return nil }