diff --git a/README.md b/README.md index 8e1c807..52bb912 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,32 @@ const client = initialize( ) ``` +Max Stream Retries +You can configure the maximum number of streaming retries before the SDK stops attempting to reconnect or falls back to polling (if enabled). The maxRetries option can be set to any positive number or Infinity for unlimited retries (which is the default). + +```typescript +const options = { + maxRetries: 5, // Set the maximum number of retries for streaming. Default is Infinity. + streamEnabled: true, + pollingEnabled: true, + pollingInterval: 60000, +} + +const client = initialize( + 'YOUR_SDK_KEY', + { + identifier: 'Harness1', + attributes: { + lastUpdated: Date(), + host: location.href + } + }, + options +) + +``` +If maxRetries is reached and pollingEnabled is true, the SDK will stay in polling mode. If pollingEnabled is false, the SDK will not poll, and evaluations will not be updated until the SDK Client is initialized again, for example if the app or page is restarted. + ## Listening to events from the `client` instance. ```typescript diff --git a/package-lock.json b/package-lock.json index 78168e2..5407278 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@harnessio/ff-javascript-client-sdk", - "version": "1.26.3", + "version": "1.27.0-rc.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@harnessio/ff-javascript-client-sdk", - "version": "1.26.3", + "version": "1.27.0-rc.0", "license": "Apache-2.0", "dependencies": { "jwt-decode": "^3.1.2", diff --git a/package.json b/package.json index 2a2f1b0..dbcc5c5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@harnessio/ff-javascript-client-sdk", - "version": "1.26.3", + "version": "1.27.0-rc.0", "author": "Harness", "license": "Apache-2.0", "main": "dist/sdk.cjs.js", diff --git a/src/__tests__/stream.test.ts b/src/__tests__/stream.test.ts new file mode 100644 index 0000000..c3bca50 --- /dev/null +++ b/src/__tests__/stream.test.ts @@ -0,0 +1,247 @@ +import { Streamer } from '../stream' +import type { Options } from '../types' +import { Event } from '../types' +import { getRandom } from '../utils' +import type { Emitter } from 'mitt' +import type Poller from "../poller"; + +jest.useFakeTimers() + +jest.mock('../utils.ts', () => ({ + getRandom: jest.fn() +})) + +const mockEventBus: Emitter = { + emit: jest.fn(), + on: jest.fn(), + off: jest.fn(), + all: new Map() +} + +const mockXHR = { + open: jest.fn(), + setRequestHeader: jest.fn(), + send: jest.fn(), + abort: jest.fn(), + status: 0, + responseText: '', + onload: null, + onerror: null, + onprogress: null, + onabort: null, + ontimeout: null +} + +global.XMLHttpRequest = jest.fn(() => mockXHR) as unknown as jest.MockedClass + +const logError = jest.fn() +const logDebug = jest.fn() + +const getStreamer = (overrides: Partial = {}, maxRetries: number = Infinity): Streamer => { + const options: Options = { + baseUrl: 'http://test', + eventUrl: 'http://event', + pollingInterval: 60000, + debug: true, + pollingEnabled: true, + streamEnabled: true, + ...overrides + } + + return new Streamer( + mockEventBus, + options, + `${options.baseUrl}/stream`, + 'test-api-key', + { 'Test-Header': 'value' }, + { start: jest.fn(), stop: jest.fn(), isPolling: jest.fn() } as unknown as Poller, + logDebug, + logError, + jest.fn(), + maxRetries + ) +} + +describe('Streamer', () => { + beforeEach(() => { + jest.clearAllMocks() + }) + + it('should connect and emit CONNECTED event', () => { + const streamer = getStreamer({}, 3) + + streamer.start() + expect(mockXHR.open).toHaveBeenCalledWith('GET', 'http://test/stream') + expect(mockXHR.send).toHaveBeenCalled() + + mockXHR.onprogress({} as ProgressEvent) + expect(mockEventBus.emit).toHaveBeenCalledWith(Event.CONNECTED) + }) + + it('should reconnect successfully after multiple failures', () => { + const streamer = getStreamer({}, 5) + + streamer.start() + expect(mockXHR.send).toHaveBeenCalled() + + for (let i = 0; i < 3; i++) { + mockXHR.onerror({} as ProgressEvent) + jest.advanceTimersByTime(getRandom(1000, 10000)) + } + + // Simulate a successful connection on the next attempt + mockXHR.onprogress({} as ProgressEvent) + + expect(mockEventBus.emit).toHaveBeenCalledWith(Event.CONNECTED) + expect(mockXHR.send).toHaveBeenCalledTimes(4) // Should attempt to reconnect 3 times before succeeding + }) + + it('should retry connecting on error and eventually fallback to polling', () => { + const streamer = getStreamer() + + streamer.start() + expect(mockXHR.send).toHaveBeenCalled() + + for (let i = 0; i < 3; i++) { + mockXHR.onerror({} as ProgressEvent) + jest.advanceTimersByTime(getRandom(1000, 10000)) + } + + expect(mockEventBus.emit).toHaveBeenCalledWith(Event.DISCONNECTED) + }) + + it('should not retry after max retries are exhausted', () => { + const streamer = getStreamer({}, 3) + + streamer.start() + expect(mockXHR.send).toHaveBeenCalled() + + for (let i = 0; i < 3; i++) { + mockXHR.onerror({} as ProgressEvent) + jest.advanceTimersByTime(getRandom(1000, 10000)) + } + + mockXHR.onerror({} as ProgressEvent) + expect(logError).toHaveBeenCalledWith('Streaming: Max streaming retries reached. Staying in polling mode.') + expect(mockEventBus.emit).toHaveBeenCalledWith(Event.DISCONNECTED) + expect(mockXHR.send).toHaveBeenCalledTimes(3) // Should not send after max retries + }) + + it('should fallback to polling on stream failure', () => { + const poller = { start: jest.fn(), stop: jest.fn(), isPolling: jest.fn() } as unknown as Poller + const streamer = new Streamer( + mockEventBus, + { baseUrl: 'http://test', eventUrl: 'http://event', pollingEnabled: true, streamEnabled: true, debug: true }, + 'http://test/stream', + 'test-api-key', + { 'Test-Header': 'value' }, + poller, + logDebug, + logError, + jest.fn(), + Infinity + ) + + streamer.start() + expect(mockXHR.send).toHaveBeenCalled() + + mockXHR.onerror({} as ProgressEvent) + jest.advanceTimersByTime(getRandom(1000, 10000)) + + expect(poller.start).toHaveBeenCalled() + expect(logDebug).toHaveBeenCalledWith('Streaming: Falling back to polling mode while stream recovers') + }) + + it('should stop polling when close is called if in fallback polling mode', () => { + const poller = { start: jest.fn(), stop: jest.fn(), isPolling: jest.fn() } as unknown as Poller + ;(poller.isPolling as jest.Mock) + .mockImplementationOnce(() => false) + .mockImplementationOnce(() => true) + + const streamer = new Streamer( + mockEventBus, + { baseUrl: 'http://test', eventUrl: 'http://event', pollingEnabled: true, streamEnabled: true, debug: true }, + 'http://test/stream', + 'test-api-key', + { 'Test-Header': 'value' }, + poller, + logDebug, + logError, + jest.fn(), + 3 + ) + + streamer.start() + expect(mockXHR.send).toHaveBeenCalled() + + // Simulate stream failure and fallback to polling + mockXHR.onerror({} as ProgressEvent) + jest.advanceTimersByTime(getRandom(1000, 10000)) + + // Ensure polling has started + expect(poller.start).toHaveBeenCalled() + + // Now close the streamer + streamer.close() + + expect(mockXHR.abort).toHaveBeenCalled() + expect(poller.stop).toHaveBeenCalled() + expect(mockEventBus.emit).toHaveBeenCalledWith(Event.STOPPED) + }) + + it('should stop streaming but not call poller.stop if not in fallback polling mode when close is called', () => { + const poller = { start: jest.fn(), stop: jest.fn(), isPolling: jest.fn().mockReturnValue(false) } as unknown as Poller + const streamer = new Streamer( + mockEventBus, + { baseUrl: 'http://test', eventUrl: 'http://event', pollingEnabled: true, streamEnabled: true, debug: true }, + 'http://test/stream', + 'test-api-key', + { 'Test-Header': 'value' }, + poller, + logDebug, + logError, + jest.fn(), + 3 + ) + + streamer.start() + streamer.close() + + expect(mockXHR.abort).toHaveBeenCalled() + expect(poller.stop).not.toHaveBeenCalled() + expect(mockEventBus.emit).toHaveBeenCalledWith(Event.STOPPED) + }) + + it('should retry indefinitely if maxRetries is set to Infinity', () => { + const streamer = getStreamer() + + streamer.start() + expect(mockXHR.send).toHaveBeenCalled() + + for (let i = 0; i < 100; i++) { + mockXHR.onerror({} as ProgressEvent) + jest.advanceTimersByTime(getRandom(1000, 10000)) + } + + expect(logError).not.toHaveBeenCalledWith('Streaming: Max streaming retries reached. Staying in polling mode.') + expect(mockXHR.send).toHaveBeenCalledTimes(101) + }) + + it('should reconnect successfully after multiple failures', () => { + const streamer = getStreamer({}, 5) + + streamer.start() + expect(mockXHR.send).toHaveBeenCalled() + + for (let i = 0; i < 3; i++) { + mockXHR.onerror({} as ProgressEvent) + jest.advanceTimersByTime(getRandom(1000, 10000)) + } + + // Simulate a successful connection on the next attempt + mockXHR.onprogress({} as ProgressEvent) + + expect(mockEventBus.emit).toHaveBeenCalledWith(Event.CONNECTED) + expect(mockXHR.send).toHaveBeenCalledTimes(4) // Should attempt to reconnect 3 times before succeeding + }) +}) diff --git a/src/index.ts b/src/index.ts index 944b787..2a3b038 100644 --- a/src/index.ts +++ b/src/index.ts @@ -543,7 +543,8 @@ const initialize = (apiKey: string, target: Target, options?: Options): Result = } else if (event.domain === 'target-segment') { handleSegmentEvent(event) } - } + }, + configurations.maxStreamRetries ) eventSource.start() } diff --git a/src/stream.ts b/src/stream.ts index a7cbce3..49a169f 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -11,7 +11,8 @@ export class Streamer { private readTimeoutCheckerId: any private connectionOpened = false private disconnectEventEmitted = false - private reconnectAttempts = 0 + private reconnectAttempts = 0 + private retriesExhausted: boolean = false constructor( private eventBus: Emitter, @@ -22,7 +23,8 @@ export class Streamer { private fallbackPoller: Poller, private logDebug: (...data: any[]) => void, private logError: (...data: any[]) => void, - private eventCallback: (e: StreamEvent) => void + private eventCallback: (e: StreamEvent) => void, + private maxRetries: number ) {} start() { @@ -60,10 +62,26 @@ export class Streamer { ) } + if (this.reconnectAttempts >= this.maxRetries) { + this.retriesExhausted = true + if (this.configurations.pollingEnabled) { + this.logErrorMessage('Max streaming retries reached. Staying in polling mode.') + } else { + this.logErrorMessage( + 'Max streaming retries reached. Polling mode is disabled and will receive no further flag updates until SDK client is restarted.' + ) + } + return + } + setTimeout(() => this.start(), reconnectDelayMs) } const onFailed = (msg: string) => { + if (this.retriesExhausted) { + return + } + if (!!msg) { this.logDebugMessage('Stream has issue', msg) } diff --git a/src/types.ts b/src/types.ts index 6a95b7c..127e7d7 100644 --- a/src/types.ts +++ b/src/types.ts @@ -144,6 +144,12 @@ export interface Options { * @default console */ logger?: Logger + + /** + * By default, the stream will attempt to reconnect indefinitely if it disconnects. Use this option to limit + * the number of attempts it will make. + */ + maxStreamRetries?: number } export interface MetricsInfo { diff --git a/src/utils.ts b/src/utils.ts index aba4d9a..f543727 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -10,7 +10,8 @@ export const defaultOptions: Options = { eventsSyncInterval: MIN_EVENTS_SYNC_INTERVAL, pollingInterval: MIN_POLLING_INTERVAL, streamEnabled: true, - cache: false + cache: false, + maxStreamRetries: Infinity } export const getConfiguration = (options: Options): Options => {