diff --git a/src/services/RabbitMQService.ts b/src/services/RabbitMQService.ts index eaf8022..b5c8667 100644 --- a/src/services/RabbitMQService.ts +++ b/src/services/RabbitMQService.ts @@ -98,23 +98,36 @@ export class RabbitMQService { private async connectAsync(): Promise { this._connection = await connect(this._configManager.amqpServerUrl() || this._configManager.amqpConfig()); - this._connection.on('close', () => { + this._connection.on('error', (error: any) => { if (this._connected) { - logger.warn('reconnect in 5 seconds'); - this.connectAsync() - .then(() => { - logger.info('reconnect successfully'); - }) - .catch((err) => { - capture(err); - logger.error(err); - }); + capture(error); + this.reconnect(); + } + }); + this._connection.on('close', (error: any) => { + if (this._connected && error) { + capture(error); + this.reconnect(); } }) this._connected = true; await this.resendMessageInFailedQueue(); } + private reconnect(): void { + logger.warn('reconnect in 5 seconds'); + setTimeout(() => { + this.connectAsync() + .then(() => { + logger.info('reconnect successfully'); + }) + .catch((err) => { + capture(err); + logger.error(err); + }) + }, 5000); + }; + public async consume(queueName: string, onMessage: (msg: MQMessage) => Promise): Promise { const exchangeName = this._queues.get(queueName); const channel = this._channels.get(exchangeName);