From 82dc07a2e013af6906f23c184a870f2954e8e7c9 Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Wed, 7 Jun 2023 11:26:04 +0100 Subject: [PATCH] FFM-8116 Stop SSE when `Client.close()` is called (#72) * FFM-8116 Import abort-controller * FFM-8116 Add abort request logic * FFM-8116 Format * FFM-8116 Update var name * FFM-8116 Tidyup * FFM-8116 Tidyup * FFM-8116 Add backwards compat solution for closing stream * FFM-8116 Add backwards compat solution for closing stream * FFM-8116 Add backwards compat solution for closing stream * FFM-8116 Comment * FFM-8116 Prettier * FFM-8116 Prettier * FFM-8116 1.2.17 release prep * FFM-8116 Use destroy() --- package-lock.json | 4 ++-- package.json | 2 +- src/streaming.ts | 51 ++++++++++++++++++++++++++++------------------- src/version.ts | 2 +- 4 files changed, 34 insertions(+), 25 deletions(-) diff --git a/package-lock.json b/package-lock.json index 36e2fde..09de29f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@harnessio/ff-nodejs-server-sdk", - "version": "1.2.16", + "version": "1.2.17", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@harnessio/ff-nodejs-server-sdk", - "version": "1.2.16", + "version": "1.2.17", "bundleDependencies": [ "axios", "eventsource", diff --git a/package.json b/package.json index be8467b..4351853 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@harnessio/ff-nodejs-server-sdk", - "version": "1.2.16", + "version": "1.2.17", "description": "Feature flags SDK for NodeJS environments", "main": "dist/cjs/index.js", "module": "dist/esm/index.mjs", diff --git a/src/streaming.ts b/src/streaming.ts index 9759e68..4009d7e 100644 --- a/src/streaming.ts +++ b/src/streaming.ts @@ -6,7 +6,7 @@ import { Repository } from './repository'; import { ConsoleLog } from './log'; import https, { RequestOptions } from 'https'; -import http from 'http'; +import http, { ClientRequest } from 'http'; type FetchFunction = ( identifier: string, @@ -17,6 +17,7 @@ type FetchFunction = ( export class StreamProcessor { static readonly CONNECTED = 1; static readonly RETRYING = 2; + static readonly CLOSED = 3; static readonly SSE_TIMEOUT_MS = 30000; private readonly apiKey: string; @@ -28,6 +29,7 @@ export class StreamProcessor { private readonly retryDelayMs: number; private options: Options; + private request: ClientRequest; private eventBus: EventEmitter; private readyState: number; private log: ConsoleLog; @@ -82,17 +84,19 @@ export class StreamProcessor { }; const onFailed = (msg: string) => { - this.retryAttempt += 1; - - const delayMs = this.getRandomRetryDelayMs(); - this.log.warn(`SSE disconnected: ${msg} will retry in ${delayMs}ms`); - this.readyState = StreamProcessor.RETRYING; - this.eventBus.emit(StreamEvent.RETRYING); - - setTimeout(() => { - this.log.info('SSE retrying to connect'); - this.connect(url, options, onConnected, onFailed); - }, delayMs); + if (this.readyState !== StreamProcessor.CLOSED) { + this.retryAttempt += 1; + + const delayMs = this.getRandomRetryDelayMs(); + this.log.warn(`SSE disconnected: ${msg} will retry in ${delayMs}ms`); + this.readyState = StreamProcessor.RETRYING; + this.eventBus.emit(StreamEvent.RETRYING); + + setTimeout(() => { + this.log.info('SSE retrying to connect'); + this.connect(url, options, onConnected, onFailed); + }, delayMs); + } }; this.connect(url, options, onConnected, onFailed); @@ -119,7 +123,7 @@ export class StreamProcessor { const isSecure = url.startsWith('https:'); this.log.debug('SSE HTTP start request', url); - (isSecure ? https : http) + this.request = (isSecure ? https : http) .request(url, options, (res) => { this.log.debug('SSE got HTTP response code', res.statusCode); @@ -146,8 +150,8 @@ export class StreamProcessor { 'ms', ); }) - .setTimeout(StreamProcessor.SSE_TIMEOUT_MS) - .end(); + .setTimeout(StreamProcessor.SSE_TIMEOUT_MS); + this.request.end(); } private processData(data: any): void { @@ -212,14 +216,19 @@ export class StreamProcessor { return this.readyState === StreamProcessor.CONNECTED; } - stop(): void { - this.log.info('Stopping StreamProcessor'); - this.eventBus.emit(StreamEvent.DISCONNECTED); - } - close(): void { + if (this.readyState === StreamProcessor.CLOSED) { + this.log.info('SteamProcessor already closed'); + return; + } + + this.readyState = StreamProcessor.CLOSED; this.log.info('Closing StreamProcessor'); - this.stop(); + + this.request.destroy(); + this.request = undefined; + + this.eventBus.emit(StreamEvent.DISCONNECTED); this.log.info('StreamProcessor closed'); } } diff --git a/src/version.ts b/src/version.ts index 3246cb4..eefdadd 100644 --- a/src/version.ts +++ b/src/version.ts @@ -1 +1 @@ -export const VERSION = '1.2.16'; +export const VERSION = "1.2.17";