Skip to content

Commit

Permalink
FFM-8116 Stop SSE when Client.close() is called (#72)
Browse files Browse the repository at this point in the history
* FFM-8116 Import abort-controller

* FFM-8116 Add abort request logic

* FFM-8116 Format

* FFM-8116 Update var name

* FFM-8116 Tidyup

* FFM-8116 Tidyup

* FFM-8116 Add backwards compat solution for closing stream

* FFM-8116 Add backwards compat solution for closing stream

* FFM-8116 Add backwards compat solution for closing stream

* FFM-8116 Comment

* FFM-8116 Prettier

* FFM-8116 Prettier

* FFM-8116 1.2.17 release prep

* FFM-8116 Use destroy()
  • Loading branch information
erdirowlands authored Jun 7, 2023
1 parent 5c4d8cc commit 82dc07a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 25 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@harnessio/ff-nodejs-server-sdk",
"version": "1.2.16",
"version": "1.2.17",
"description": "Feature flags SDK for NodeJS environments",
"main": "dist/cjs/index.js",
"module": "dist/esm/index.mjs",
Expand Down
51 changes: 30 additions & 21 deletions src/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Repository } from './repository';
import { ConsoleLog } from './log';

import https, { RequestOptions } from 'https';
import http from 'http';
import http, { ClientRequest } from 'http';

type FetchFunction = (
identifier: string,
Expand All @@ -17,6 +17,7 @@ type FetchFunction = (
export class StreamProcessor {
static readonly CONNECTED = 1;
static readonly RETRYING = 2;
static readonly CLOSED = 3;
static readonly SSE_TIMEOUT_MS = 30000;

private readonly apiKey: string;
Expand All @@ -28,6 +29,7 @@ export class StreamProcessor {
private readonly retryDelayMs: number;

private options: Options;
private request: ClientRequest;
private eventBus: EventEmitter;
private readyState: number;
private log: ConsoleLog;
Expand Down Expand Up @@ -82,17 +84,19 @@ export class StreamProcessor {
};

const onFailed = (msg: string) => {
this.retryAttempt += 1;

const delayMs = this.getRandomRetryDelayMs();
this.log.warn(`SSE disconnected: ${msg} will retry in ${delayMs}ms`);
this.readyState = StreamProcessor.RETRYING;
this.eventBus.emit(StreamEvent.RETRYING);

setTimeout(() => {
this.log.info('SSE retrying to connect');
this.connect(url, options, onConnected, onFailed);
}, delayMs);
if (this.readyState !== StreamProcessor.CLOSED) {
this.retryAttempt += 1;

const delayMs = this.getRandomRetryDelayMs();
this.log.warn(`SSE disconnected: ${msg} will retry in ${delayMs}ms`);
this.readyState = StreamProcessor.RETRYING;
this.eventBus.emit(StreamEvent.RETRYING);

setTimeout(() => {
this.log.info('SSE retrying to connect');
this.connect(url, options, onConnected, onFailed);
}, delayMs);
}
};

this.connect(url, options, onConnected, onFailed);
Expand All @@ -119,7 +123,7 @@ export class StreamProcessor {
const isSecure = url.startsWith('https:');
this.log.debug('SSE HTTP start request', url);

(isSecure ? https : http)
this.request = (isSecure ? https : http)
.request(url, options, (res) => {
this.log.debug('SSE got HTTP response code', res.statusCode);

Expand All @@ -146,8 +150,8 @@ export class StreamProcessor {
'ms',
);
})
.setTimeout(StreamProcessor.SSE_TIMEOUT_MS)
.end();
.setTimeout(StreamProcessor.SSE_TIMEOUT_MS);
this.request.end();
}

private processData(data: any): void {
Expand Down Expand Up @@ -212,14 +216,19 @@ export class StreamProcessor {
return this.readyState === StreamProcessor.CONNECTED;
}

stop(): void {
this.log.info('Stopping StreamProcessor');
this.eventBus.emit(StreamEvent.DISCONNECTED);
}

close(): void {
if (this.readyState === StreamProcessor.CLOSED) {
this.log.info('SteamProcessor already closed');
return;
}

this.readyState = StreamProcessor.CLOSED;
this.log.info('Closing StreamProcessor');
this.stop();

this.request.destroy();
this.request = undefined;

this.eventBus.emit(StreamEvent.DISCONNECTED);
this.log.info('StreamProcessor closed');
}
}
2 changes: 1 addition & 1 deletion src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export const VERSION = '1.2.16';
export const VERSION = "1.2.17";

0 comments on commit 82dc07a

Please sign in to comment.