Skip to content

Commit

Permalink
Merge pull request #27 from transcovo/TRAN-28143_amqp_con_close
Browse files Browse the repository at this point in the history
TRAN-28143 - feat(connection|event): Handling close event with shutdown
  • Loading branch information
MohamedJeffal authored Feb 27, 2019
2 parents 67a0391 + fb1e814 commit 1cb6b10
Show file tree
Hide file tree
Showing 6 changed files with 2,321 additions and 6 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ Creates a client. Returns a `Promise` of the client.

const client = yield bus.createClient(url);

### client.close()
### client.close(forceClose = false)

Closes the client. Returns a `Promise`.
When passing a truthy `forceClose`, `close()` will also `exit(1)` the current process,
so when handling erronous case that you can't recover from, one should call `close(true)`.

client.close();

Expand Down
24 changes: 23 additions & 1 deletion lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const EventEmitter = require('events');
const url = require('url');
const amqplib = require('amqplib');
const co = require('co');
const logger = require('chpr-logger');

const DEFAULT_EXCHANGE_TYPE = 'topic';
const DEFAULT_HEARTBEAT = 10;
Expand All @@ -19,6 +20,7 @@ const DEFAULT_HEARTBEAT = 10;
* @param {Object} [options]
* @param {Number} [options.heartbeat] : the heartbeat that you want to use.
* @param {Boolean} [options.useConfirmChannel] : use a confirm channel
* @param {Number|undefined} [options.processExitTimeout] : the time in ms that is (at least) waited before exiting process.
*/
function* createClient(rabbitmqUrl, options) {
options = options || {};
Expand All @@ -27,6 +29,7 @@ function* createClient(rabbitmqUrl, options) {

const parsedurl = url.parse(rabbitmqUrl);
options.servername = parsedurl.hostname;
options.processExitTimeout = options.processExitTimeout || 0;

const connection = yield amqplib.connect(rabbitmqUrl, options);
const channel = yield (options.useConfirmChannel
Expand All @@ -40,19 +43,38 @@ function* createClient(rabbitmqUrl, options) {
consume,
listen,
publish,
close: co.wrap(function* close() {
close: co.wrap(function* close(forceClose = false) {
if (this.channel !== null) {
yield this.channel.close();
this.channel = null;
}
if (this.connection !== null) {
this.connection.removeListener('close', handleExitOnConnectionClose);
yield this.connection.close();
this.connection = null;
}

if (forceClose) {
yield cb => setTimeout(cb, options.processExitTimeout);
process.exit(1);
}
})
});

busClient.connection.on('close', handleExitOnConnectionClose);

return busClient;

/**
* Handle the close event from the connection by cleaning amqp resources & exiting the process,
* as restart is handled by the instance orchestrator
* @returns {Promise<void>} - Returns the client close promise
*/
function handleExitOnConnectionClose() {
logger.error('[client#handleExitOnConnectionClose] Client connection closed, process will exit');
return busClient.close(true);
}

/**
* Check that exchange and queue are created and bind exchange to queue with rooting key.
*
Expand Down
Loading

0 comments on commit 1cb6b10

Please sign in to comment.