Skip to content

Commit 83da2c3

Browse files
committed
add reconnect for amqp
1 parent 762d8a7 commit 83da2c3

File tree

1 file changed

+23
-10
lines changed

1 file changed

+23
-10
lines changed

src/services/RabbitMQService.ts

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -98,23 +98,36 @@ export class RabbitMQService {
9898

9999
private async connectAsync(): Promise<void> {
100100
this._connection = await connect(this._configManager.amqpServerUrl() || this._configManager.amqpConfig());
101-
this._connection.on('close', () => {
101+
this._connection.on('error', (error: any) => {
102102
if (this._connected) {
103-
logger.warn('reconnect in 5 seconds');
104-
this.connectAsync()
105-
.then(() => {
106-
logger.info('reconnect successfully');
107-
})
108-
.catch((err) => {
109-
capture(err);
110-
logger.error(err);
111-
});
103+
capture(error);
104+
this.reconnect();
105+
}
106+
});
107+
this._connection.on('close', (error: any) => {
108+
if (this._connected && error) {
109+
capture(error);
110+
this.reconnect();
112111
}
113112
})
114113
this._connected = true;
115114
await this.resendMessageInFailedQueue();
116115
}
117116

117+
private reconnect(): void {
118+
logger.warn('reconnect in 5 seconds');
119+
setTimeout(() => {
120+
this.connectAsync()
121+
.then(() => {
122+
logger.info('reconnect successfully');
123+
})
124+
.catch((err) => {
125+
capture(err);
126+
logger.error(err);
127+
})
128+
}, 5000);
129+
};
130+
118131
public async consume(queueName: string, onMessage: (msg: MQMessage) => Promise<boolean>): Promise<string> {
119132
const exchangeName = this._queues.get(queueName);
120133
const channel = this._channels.get(exchangeName);

0 commit comments

Comments
 (0)