Skip to content

Commit

Permalink
feat: Automatically reconnect when the PG listener connection fails
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro committed Apr 23, 2024
1 parent 0105403 commit 37b6732
Showing 1 changed file with 53 additions and 10 deletions.
63 changes: 53 additions & 10 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var (
// DefaultConnectionTimeout defines the default amount of time that Neoq waits for connections to become available.
DefaultConnectionTimeout = 30 * time.Second
txCtxVarKey contextKey
reconnectWaitTime = 5 * time.Second
shutdownJobID = "-1" // job ID announced when triggering a shutdown
shutdownAnnouncementAllowance = 100 // ms
ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings")
Expand All @@ -88,6 +89,7 @@ type PgBackend struct {
newQueues chan string // a channel that indicates that new queues are ready to be processed
readyQueues chan string // a channel that indicates which queues are ready to have jobs processed.
listenCancelCh chan context.CancelFunc // cancellation channel for the listenerConn's WaitForNotification call.
listenConnDown chan bool // listenConnDown indicates that the listener connection is down
listenerConn *pgx.Conn // dedicated connection that LISTENs for jobs across all queues
listenerConnMu *sync.RWMutex // listenerConnMu protects the listener connection from concurrent access
logger logging.Logger // backend-wide logger
Expand Down Expand Up @@ -135,6 +137,7 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
listenerConnMu: &sync.RWMutex{},
mu: &sync.RWMutex{},
listenCancelCh: make(chan context.CancelFunc, 1),
listenConnDown: make(chan bool),
}

// Set all options
Expand Down Expand Up @@ -183,14 +186,10 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
}
}

p.listenerConn, err = p.newListenerConn(ctx)
if err != nil {
p.logger.Error("unable to initialize listener connection", slog.Any("error", err))
return nil, fmt.Errorf("unable to create neoq listener connection: %w", err)
}

// monitor handlers for changes and LISTEN when new queues are added
go p.newQueueMonitor(ctx)
go p.listenerManager(ctx)

p.listenConnDown <- true

p.cron.Start()

Expand All @@ -199,12 +198,36 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
return pb, nil
}

// newQueueMonitor monitors for new queues and instruct's the listener connection to LISTEN for jobs on them
func (p *PgBackend) newQueueMonitor(ctx context.Context) {
// listenerManager manages the LISTENer connection and adding queue to it
func (p *PgBackend) listenerManager(ctx context.Context) {
var err error
for {
select {
case <-ctx.Done():
return
case <-p.listenConnDown:
lc, err := p.newListenerConn(ctx)
if err != nil {
p.logger.Error("listener connection is down, and unable to reconnect", slog.Any("error", err))
continue
}

p.listenerConnMu.Lock()
p.listenerConn = lc
p.mu.Lock()
handlers := p.handlers
p.mu.Unlock()

for queue := range handlers {
_, err = p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, queue))
if err != nil {
p.logger.Error("unable to listen on queue", slog.Any("error", err), slog.String("queue", queue))
}
}

p.listenerConnMu.Unlock()

p.logger.Debug("worker database connection established")
case newQueue := <-p.newQueues:
p.logger.Debug("configure new handler", "queue", newQueue)
setup_listeners:
Expand All @@ -217,9 +240,19 @@ func (p *PgBackend) newQueueMonitor(ctx context.Context) {
default:
}

p.listenerConnMu.Lock()
lc := p.listenerConn
p.listenerConnMu.Unlock()
if lc == nil || lc.IsClosed() {
p.logger.Error("worker database connection closed and will attempt to reconnect periodically. jobs are not being processed")
p.listenConnDown <- true
time.Sleep(reconnectWaitTime)
continue
}

p.listenerConnMu.Lock()
// note: 'LISTEN, channel' is idempotent
_, err := p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, newQueue))
_, err = p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, newQueue))
p.listenerConnMu.Unlock()
if err != nil {
err = fmt.Errorf("unable to configure listener connection: %w", err)
Expand Down Expand Up @@ -951,6 +984,16 @@ func (p *PgBackend) listen(ctx context.Context) (c chan *pgconn.Notification, er
}

p.logger.Error("failed to wait for notification", slog.Any("error", waitErr))

p.listenerConnMu.Lock()
lc := p.listenerConn
p.listenerConnMu.Unlock()
if lc == nil || lc.IsClosed() {
p.logger.Error("worker database connection closed and will attempt to reconnect periodically. jobs are not being processed")
p.listenConnDown <- true
time.Sleep(reconnectWaitTime)
}

continue
}

Expand Down

0 comments on commit 37b6732

Please sign in to comment.