Skip to content

Commit

Permalink
Merge pull request #10 from irohalab/reconnect-amqp
Browse files Browse the repository at this point in the history
add reconnect for amqp
  • Loading branch information
EverettSummer authored Mar 10, 2022
2 parents 762d8a7 + 83da2c3 commit 104836b
Showing 1 changed file with 23 additions and 10 deletions.
33 changes: 23 additions & 10 deletions src/services/RabbitMQService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,36 @@ export class RabbitMQService {

private async connectAsync(): Promise<void> {
this._connection = await connect(this._configManager.amqpServerUrl() || this._configManager.amqpConfig());
this._connection.on('close', () => {
this._connection.on('error', (error: any) => {
if (this._connected) {
logger.warn('reconnect in 5 seconds');
this.connectAsync()
.then(() => {
logger.info('reconnect successfully');
})
.catch((err) => {
capture(err);
logger.error(err);
});
capture(error);
this.reconnect();
}
});
this._connection.on('close', (error: any) => {
if (this._connected && error) {
capture(error);
this.reconnect();
}
})
this._connected = true;
await this.resendMessageInFailedQueue();
}

private reconnect(): void {
logger.warn('reconnect in 5 seconds');
setTimeout(() => {
this.connectAsync()
.then(() => {
logger.info('reconnect successfully');
})
.catch((err) => {
capture(err);
logger.error(err);
})
}, 5000);
};

public async consume(queueName: string, onMessage: (msg: MQMessage) => Promise<boolean>): Promise<string> {
const exchangeName = this._queues.get(queueName);
const channel = this._channels.get(exchangeName);
Expand Down

0 comments on commit 104836b

Please sign in to comment.