diff --git a/src/AmqpClient.ts b/src/AmqpClient.ts index 0ffbd87..d47e12e 100644 --- a/src/AmqpClient.ts +++ b/src/AmqpClient.ts @@ -17,6 +17,8 @@ export class AmqpClient { private amqpConfig: AmqpConfig private exchangeConfig: ExchangeConfig private consumers: Consumer[] = [] + private isReconnecting = false + public connection: Connection public channel: Channel @@ -61,25 +63,32 @@ export class AmqpClient { this.connection.on('error', (e): void => { log.error(e) + this.close() }) this.connection.on('close', this.reconnect.bind(this)) } private async reconnect(): Promise { - log.warn('AMQP connection closed!') - const { autoReconnect, retryConnectionInterval } = this.amqpConfig - - if (autoReconnect) { - log.info('Attempting to reconnect...') - setTimeout(async () => { - try { - await this.init(this.exchangeConfig) - log.warn('Reconnection successful!') - } catch (e) { - log.info('Unable to reconnect: ', e) - } - }, retryConnectionInterval) + if (!this.isReconnecting) { + this.isReconnecting = true + log.warn('AMQP connection closed!') + const { autoReconnect, retryConnectionInterval } = this.amqpConfig + + if (autoReconnect) { + log.info('Attempting to reconnect...') + setTimeout(async () => { + try { + await this.init(this.exchangeConfig) + this.isReconnecting = false + log.warn('Reconnection successful!') + } catch (e) { + this.isReconnecting = false + log.info('Unable to reconnect: ', e) + this.reconnect() + } + }, retryConnectionInterval) + } } }