Skip to content

Commit

Permalink
Merge pull request #28 from transcovo/TRAN-28143_amqp_cleanup
Browse files Browse the repository at this point in the history
TRAN-28143 - feat(connection|event): Handling close cleanup operations
  • Loading branch information
MohamedJeffal authored Mar 4, 2019
2 parents 1cb6b10 + 1b45152 commit 920005d
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 7 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ You can use `listener` as an EventEmitter. It emits the following events =

## Client API

### EventEmitter interface

The client is an EventEmitter`. It emits the following events:

* `close_cleanup`: emitted once, when handling a `close` connection event.
It provides a timeout (defaut = `200ms`) before executing a `client.close(forceClose = true)`,
for the host app to execute any cleanup operations (such as closing a `mongodb` client).

### bus.createClient(url)

Creates a client. Returns a `Promise` of the client.
Expand All @@ -61,7 +69,7 @@ Creates a client. Returns a `Promise` of the client.

Closes the client. Returns a `Promise`.
When passing a truthy `forceClose`, `close()` will also `exit(1)` the current process,
so when handling erronous case that you can't recover from, one should call `close(true)`.
so when handling erroneous case that you can't recover from, one should call `close(true)`.

client.close();

Expand Down
2 changes: 2 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ declare namespace Bus {
export interface BusOptions {
heartbeat?: number;
useConfirmChannel?: Boolean;
processExitCleanupTimeout?: number;
processExitTimeout?: number;
}

export interface PublishOptions {
Expand Down
19 changes: 15 additions & 4 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const DEFAULT_HEARTBEAT = 10;
* @param {Object} [options]
* @param {Number} [options.heartbeat] : the heartbeat that you want to use.
* @param {Boolean} [options.useConfirmChannel] : use a confirm channel
* @param {Number|undefined} [options.processExitCleanupTimeout] : the time in ms that is (at least) waited for close cleanup operations.
* @param {Number|undefined} [options.processExitTimeout] : the time in ms that is (at least) waited before exiting process.
*/
function* createClient(rabbitmqUrl, options) {
Expand All @@ -29,6 +30,8 @@ function* createClient(rabbitmqUrl, options) {

const parsedurl = url.parse(rabbitmqUrl);
options.servername = parsedurl.hostname;

options.processExitCleanupTimeout = options.processExitCleanupTimeout || 200;
options.processExitTimeout = options.processExitTimeout || 0;

const connection = yield amqplib.connect(rabbitmqUrl, options);
Expand Down Expand Up @@ -67,12 +70,20 @@ function* createClient(rabbitmqUrl, options) {

/**
* Handle the close event from the connection by cleaning amqp resources & exiting the process,
* as restart is handled by the instance orchestrator
* @returns {Promise<void>} - Returns the client close promise
* as restart is handled by the instance orchestrator.
* An exit cleanup timeout is provided for the host app to execute some cleanup operations
* @returns {void}
*/
function handleExitOnConnectionClose() {
logger.error('[client#handleExitOnConnectionClose] Client connection closed, process will exit');
return busClient.close(true);
const timeoutClearToken = setTimeout(timeoutHandler, options.processExitCleanupTimeout);

function timeoutHandler() {
clearTimeout(timeoutClearToken);
logger.info('[client#handleExitOnConnectionClose] Client connection closed, process will exit');
return busClient.close(true);
}

busClient.emit('close_cleanup');
}

/**
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@chauffeur-prive/node-amqp-bus",
"version": "5.1.2",
"version": "5.2.0",
"description": "Implement a Bus using AMQP in nodejs",
"repository": {
"type": "git",
Expand Down
75 changes: 74 additions & 1 deletion test/lib/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ describe('Node AMQP Bus Client', function testBus() {
.onFirstCall()
.returns(1);

const busClient = yield createBusClient(URL);
const busClient = yield createBusClient(URL, { processExitCleanupTimeout: 1 });
const busClientCloseSpy = sandbox.spy(busClient, 'close');

busClient.connection.emit('close');
Expand All @@ -83,6 +83,79 @@ describe('Node AMQP Bus Client', function testBus() {
expect(busClientCloseSpy.calledOnce).to.equal(true);
expect(processExitStub.calledOnce).to.equal(true);
});

it('Should emit a close_cleanup event, then exit gracefully when the connection close event is sent', function* it() {
const processExitStub = sandbox.stub(process, 'exit');
processExitStub
.withArgs(1)
.onFirstCall()
.returns(1);

const busClient = yield createBusClient(URL, { processExitCleanupTimeout: 1 });
const busClientCloseSpy = sandbox.spy(busClient, 'close');
let closeCleanupEventEmitted = false;

busClient.on('close_cleanup', () => {
closeCleanupEventEmitted = true;
});

busClient.connection.emit('close');
yield cb => setTimeout(cb, 20);

expect(closeCleanupEventEmitted).to.equal(true);
expect(busClientCloseSpy.calledOnce).to.equal(true);
expect(processExitStub.calledOnce).to.equal(true);
});

it('Should complete the cleanup operation, then exit gracefully when the connection close event is sent', function* it() {
const processExitStub = sandbox.stub(process, 'exit');
processExitStub
.withArgs(1)
.onFirstCall()
.returns(1);

const busClient = yield createBusClient(URL, { processExitCleanupTimeout: 10 });
const busClientCloseSpy = sandbox.spy(busClient, 'close');
let closeCleanupEventDone = false;

busClient.on('close_cleanup', () => {
setTimeout(() => {
closeCleanupEventDone = true;
}, 5);
});

busClient.connection.emit('close');
yield cb => setTimeout(cb, 30);

expect(closeCleanupEventDone).to.equal(true);
expect(busClientCloseSpy.calledOnce).to.equal(true);
expect(processExitStub.calledOnce).to.equal(true);
});

it('Should interrupt the cleanup operation when it exceeds the defined timeout, by exiting gracefully when the connection close event is sent', function* it() {
const processExitStub = sandbox.stub(process, 'exit');
processExitStub
.withArgs(1)
.onFirstCall()
.returns(1);

const busClient = yield createBusClient(URL, { processExitCleanupTimeout: 1 });
const busClientCloseSpy = sandbox.spy(busClient, 'close');
let forceCloseCleanupEventDone = false;

busClient.on('close_cleanup', () => {
setTimeout(() => {
forceCloseCleanupEventDone = true;
}, 20);
});

busClient.connection.emit('close');
yield cb => setTimeout(cb, 10);

expect(forceCloseCleanupEventDone).to.equal(false);
expect(busClientCloseSpy.calledOnce).to.equal(true);
expect(processExitStub.calledOnce).to.equal(true);
});
});

describe('#close', () => {
Expand Down

0 comments on commit 920005d

Please sign in to comment.