From 1b45152f319c631a328fe56ce00150469ca248ad Mon Sep 17 00:00:00 2001 From: Mohamed Jeffal Date: Wed, 27 Feb 2019 22:56:46 +0100 Subject: [PATCH] TRAN-28143 - feat(connection|event): Handling close cleanup operations --- README.md | 10 +++++- index.d.ts | 2 ++ lib/client.js | 19 ++++++++--- package.json | 2 +- test/lib/client.test.js | 75 ++++++++++++++++++++++++++++++++++++++++- 5 files changed, 101 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index ef9f55d..a479943 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,14 @@ You can use `listener` as an EventEmitter. It emits the following events = ## Client API +### EventEmitter interface + +The client is an EventEmitter`. It emits the following events: + +* `close_cleanup`: emitted once, when handling a `close` connection event. + It provides a timeout (defaut = `200ms`) before executing a `client.close(forceClose = true)`, + for the host app to execute any cleanup operations (such as closing a `mongodb` client). + ### bus.createClient(url) Creates a client. Returns a `Promise` of the client. @@ -61,7 +69,7 @@ Creates a client. Returns a `Promise` of the client. Closes the client. Returns a `Promise`. When passing a truthy `forceClose`, `close()` will also `exit(1)` the current process, -so when handling erronous case that you can't recover from, one should call `close(true)`. +so when handling erroneous case that you can't recover from, one should call `close(true)`. client.close(); diff --git a/index.d.ts b/index.d.ts index 244465c..777e069 100644 --- a/index.d.ts +++ b/index.d.ts @@ -29,6 +29,8 @@ declare namespace Bus { export interface BusOptions { heartbeat?: number; useConfirmChannel?: Boolean; + processExitCleanupTimeout?: number; + processExitTimeout?: number; } export interface PublishOptions { diff --git a/lib/client.js b/lib/client.js index d159386..1aa31c0 100644 --- a/lib/client.js +++ b/lib/client.js @@ -20,6 +20,7 @@ const DEFAULT_HEARTBEAT = 10; * @param {Object} [options] * @param {Number} [options.heartbeat] : the heartbeat that you want to use. * @param {Boolean} [options.useConfirmChannel] : use a confirm channel + * @param {Number|undefined} [options.processExitCleanupTimeout] : the time in ms that is (at least) waited for close cleanup operations. * @param {Number|undefined} [options.processExitTimeout] : the time in ms that is (at least) waited before exiting process. */ function* createClient(rabbitmqUrl, options) { @@ -29,6 +30,8 @@ function* createClient(rabbitmqUrl, options) { const parsedurl = url.parse(rabbitmqUrl); options.servername = parsedurl.hostname; + + options.processExitCleanupTimeout = options.processExitCleanupTimeout || 200; options.processExitTimeout = options.processExitTimeout || 0; const connection = yield amqplib.connect(rabbitmqUrl, options); @@ -67,12 +70,20 @@ function* createClient(rabbitmqUrl, options) { /** * Handle the close event from the connection by cleaning amqp resources & exiting the process, - * as restart is handled by the instance orchestrator - * @returns {Promise} - Returns the client close promise + * as restart is handled by the instance orchestrator. + * An exit cleanup timeout is provided for the host app to execute some cleanup operations + * @returns {void} */ function handleExitOnConnectionClose() { - logger.error('[client#handleExitOnConnectionClose] Client connection closed, process will exit'); - return busClient.close(true); + const timeoutClearToken = setTimeout(timeoutHandler, options.processExitCleanupTimeout); + + function timeoutHandler() { + clearTimeout(timeoutClearToken); + logger.info('[client#handleExitOnConnectionClose] Client connection closed, process will exit'); + return busClient.close(true); + } + + busClient.emit('close_cleanup'); } /** diff --git a/package.json b/package.json index ea846bc..717fa8f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@chauffeur-prive/node-amqp-bus", - "version": "5.1.2", + "version": "5.2.0", "description": "Implement a Bus using AMQP in nodejs", "repository": { "type": "git", diff --git a/test/lib/client.test.js b/test/lib/client.test.js index 27ecc7e..3b7d54c 100644 --- a/test/lib/client.test.js +++ b/test/lib/client.test.js @@ -74,7 +74,7 @@ describe('Node AMQP Bus Client', function testBus() { .onFirstCall() .returns(1); - const busClient = yield createBusClient(URL); + const busClient = yield createBusClient(URL, { processExitCleanupTimeout: 1 }); const busClientCloseSpy = sandbox.spy(busClient, 'close'); busClient.connection.emit('close'); @@ -83,6 +83,79 @@ describe('Node AMQP Bus Client', function testBus() { expect(busClientCloseSpy.calledOnce).to.equal(true); expect(processExitStub.calledOnce).to.equal(true); }); + + it('Should emit a close_cleanup event, then exit gracefully when the connection close event is sent', function* it() { + const processExitStub = sandbox.stub(process, 'exit'); + processExitStub + .withArgs(1) + .onFirstCall() + .returns(1); + + const busClient = yield createBusClient(URL, { processExitCleanupTimeout: 1 }); + const busClientCloseSpy = sandbox.spy(busClient, 'close'); + let closeCleanupEventEmitted = false; + + busClient.on('close_cleanup', () => { + closeCleanupEventEmitted = true; + }); + + busClient.connection.emit('close'); + yield cb => setTimeout(cb, 20); + + expect(closeCleanupEventEmitted).to.equal(true); + expect(busClientCloseSpy.calledOnce).to.equal(true); + expect(processExitStub.calledOnce).to.equal(true); + }); + + it('Should complete the cleanup operation, then exit gracefully when the connection close event is sent', function* it() { + const processExitStub = sandbox.stub(process, 'exit'); + processExitStub + .withArgs(1) + .onFirstCall() + .returns(1); + + const busClient = yield createBusClient(URL, { processExitCleanupTimeout: 10 }); + const busClientCloseSpy = sandbox.spy(busClient, 'close'); + let closeCleanupEventDone = false; + + busClient.on('close_cleanup', () => { + setTimeout(() => { + closeCleanupEventDone = true; + }, 5); + }); + + busClient.connection.emit('close'); + yield cb => setTimeout(cb, 30); + + expect(closeCleanupEventDone).to.equal(true); + expect(busClientCloseSpy.calledOnce).to.equal(true); + expect(processExitStub.calledOnce).to.equal(true); + }); + + it('Should interrupt the cleanup operation when it exceeds the defined timeout, by exiting gracefully when the connection close event is sent', function* it() { + const processExitStub = sandbox.stub(process, 'exit'); + processExitStub + .withArgs(1) + .onFirstCall() + .returns(1); + + const busClient = yield createBusClient(URL, { processExitCleanupTimeout: 1 }); + const busClientCloseSpy = sandbox.spy(busClient, 'close'); + let forceCloseCleanupEventDone = false; + + busClient.on('close_cleanup', () => { + setTimeout(() => { + forceCloseCleanupEventDone = true; + }, 20); + }); + + busClient.connection.emit('close'); + yield cb => setTimeout(cb, 10); + + expect(forceCloseCleanupEventDone).to.equal(false); + expect(busClientCloseSpy.calledOnce).to.equal(true); + expect(processExitStub.calledOnce).to.equal(true); + }); }); describe('#close', () => {