Skip to content

Commit

Permalink
FFM-11788 Add maxStreamRetries config option (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
erdirowlands authored Jul 31, 2024
1 parent a62e831 commit 1e4218b
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 7 deletions.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,32 @@ const client = initialize(
)
```

Max Stream Retries
You can configure the maximum number of streaming retries before the SDK stops attempting to reconnect or falls back to polling (if enabled). The maxRetries option can be set to any positive number or Infinity for unlimited retries (which is the default).

```typescript
const options = {
maxRetries: 5, // Set the maximum number of retries for streaming. Default is Infinity.
streamEnabled: true,
pollingEnabled: true,
pollingInterval: 60000,
}

const client = initialize(
'YOUR_SDK_KEY',
{
identifier: 'Harness1',
attributes: {
lastUpdated: Date(),
host: location.href
}
},
options
)

```
If maxRetries is reached and pollingEnabled is true, the SDK will stay in polling mode. If pollingEnabled is false, the SDK will not poll, and evaluations will not be updated until the SDK Client is initialized again, for example if the app or page is restarted.

## Listening to events from the `client` instance.

```typescript
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@harnessio/ff-javascript-client-sdk",
"version": "1.26.3",
"version": "1.27.0-rc.0",
"author": "Harness",
"license": "Apache-2.0",
"main": "dist/sdk.cjs.js",
Expand Down
247 changes: 247 additions & 0 deletions src/__tests__/stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
import { Streamer } from '../stream'
import type { Options } from '../types'
import { Event } from '../types'
import { getRandom } from '../utils'
import type { Emitter } from 'mitt'
import type Poller from "../poller";

jest.useFakeTimers()

jest.mock('../utils.ts', () => ({
getRandom: jest.fn()
}))

const mockEventBus: Emitter = {
emit: jest.fn(),
on: jest.fn(),
off: jest.fn(),
all: new Map()
}

const mockXHR = {
open: jest.fn(),
setRequestHeader: jest.fn(),
send: jest.fn(),
abort: jest.fn(),
status: 0,
responseText: '',
onload: null,
onerror: null,
onprogress: null,
onabort: null,
ontimeout: null
}

global.XMLHttpRequest = jest.fn(() => mockXHR) as unknown as jest.MockedClass<typeof XMLHttpRequest>

const logError = jest.fn()
const logDebug = jest.fn()

const getStreamer = (overrides: Partial<Options> = {}, maxRetries: number = Infinity): Streamer => {
const options: Options = {
baseUrl: 'http://test',
eventUrl: 'http://event',
pollingInterval: 60000,
debug: true,
pollingEnabled: true,
streamEnabled: true,
...overrides
}

return new Streamer(
mockEventBus,
options,
`${options.baseUrl}/stream`,
'test-api-key',
{ 'Test-Header': 'value' },
{ start: jest.fn(), stop: jest.fn(), isPolling: jest.fn() } as unknown as Poller,
logDebug,
logError,
jest.fn(),
maxRetries
)
}

describe('Streamer', () => {
beforeEach(() => {
jest.clearAllMocks()
})

it('should connect and emit CONNECTED event', () => {
const streamer = getStreamer({}, 3)

streamer.start()
expect(mockXHR.open).toHaveBeenCalledWith('GET', 'http://test/stream')
expect(mockXHR.send).toHaveBeenCalled()

mockXHR.onprogress({} as ProgressEvent)
expect(mockEventBus.emit).toHaveBeenCalledWith(Event.CONNECTED)
})

it('should reconnect successfully after multiple failures', () => {
const streamer = getStreamer({}, 5)

streamer.start()
expect(mockXHR.send).toHaveBeenCalled()

for (let i = 0; i < 3; i++) {
mockXHR.onerror({} as ProgressEvent)
jest.advanceTimersByTime(getRandom(1000, 10000))
}

// Simulate a successful connection on the next attempt
mockXHR.onprogress({} as ProgressEvent)

expect(mockEventBus.emit).toHaveBeenCalledWith(Event.CONNECTED)
expect(mockXHR.send).toHaveBeenCalledTimes(4) // Should attempt to reconnect 3 times before succeeding
})

it('should retry connecting on error and eventually fallback to polling', () => {
const streamer = getStreamer()

streamer.start()
expect(mockXHR.send).toHaveBeenCalled()

for (let i = 0; i < 3; i++) {
mockXHR.onerror({} as ProgressEvent)
jest.advanceTimersByTime(getRandom(1000, 10000))
}

expect(mockEventBus.emit).toHaveBeenCalledWith(Event.DISCONNECTED)
})

it('should not retry after max retries are exhausted', () => {
const streamer = getStreamer({}, 3)

streamer.start()
expect(mockXHR.send).toHaveBeenCalled()

for (let i = 0; i < 3; i++) {
mockXHR.onerror({} as ProgressEvent)
jest.advanceTimersByTime(getRandom(1000, 10000))
}

mockXHR.onerror({} as ProgressEvent)
expect(logError).toHaveBeenCalledWith('Streaming: Max streaming retries reached. Staying in polling mode.')
expect(mockEventBus.emit).toHaveBeenCalledWith(Event.DISCONNECTED)
expect(mockXHR.send).toHaveBeenCalledTimes(3) // Should not send after max retries
})

it('should fallback to polling on stream failure', () => {
const poller = { start: jest.fn(), stop: jest.fn(), isPolling: jest.fn() } as unknown as Poller
const streamer = new Streamer(
mockEventBus,
{ baseUrl: 'http://test', eventUrl: 'http://event', pollingEnabled: true, streamEnabled: true, debug: true },
'http://test/stream',
'test-api-key',
{ 'Test-Header': 'value' },
poller,
logDebug,
logError,
jest.fn(),
Infinity
)

streamer.start()
expect(mockXHR.send).toHaveBeenCalled()

mockXHR.onerror({} as ProgressEvent)
jest.advanceTimersByTime(getRandom(1000, 10000))

expect(poller.start).toHaveBeenCalled()
expect(logDebug).toHaveBeenCalledWith('Streaming: Falling back to polling mode while stream recovers')
})

it('should stop polling when close is called if in fallback polling mode', () => {
const poller = { start: jest.fn(), stop: jest.fn(), isPolling: jest.fn() } as unknown as Poller
;(poller.isPolling as jest.Mock)
.mockImplementationOnce(() => false)
.mockImplementationOnce(() => true)

const streamer = new Streamer(
mockEventBus,
{ baseUrl: 'http://test', eventUrl: 'http://event', pollingEnabled: true, streamEnabled: true, debug: true },
'http://test/stream',
'test-api-key',
{ 'Test-Header': 'value' },
poller,
logDebug,
logError,
jest.fn(),
3
)

streamer.start()
expect(mockXHR.send).toHaveBeenCalled()

// Simulate stream failure and fallback to polling
mockXHR.onerror({} as ProgressEvent)
jest.advanceTimersByTime(getRandom(1000, 10000))

// Ensure polling has started
expect(poller.start).toHaveBeenCalled()

// Now close the streamer
streamer.close()

expect(mockXHR.abort).toHaveBeenCalled()
expect(poller.stop).toHaveBeenCalled()
expect(mockEventBus.emit).toHaveBeenCalledWith(Event.STOPPED)
})

it('should stop streaming but not call poller.stop if not in fallback polling mode when close is called', () => {
const poller = { start: jest.fn(), stop: jest.fn(), isPolling: jest.fn().mockReturnValue(false) } as unknown as Poller
const streamer = new Streamer(
mockEventBus,
{ baseUrl: 'http://test', eventUrl: 'http://event', pollingEnabled: true, streamEnabled: true, debug: true },
'http://test/stream',
'test-api-key',
{ 'Test-Header': 'value' },
poller,
logDebug,
logError,
jest.fn(),
3
)

streamer.start()
streamer.close()

expect(mockXHR.abort).toHaveBeenCalled()
expect(poller.stop).not.toHaveBeenCalled()
expect(mockEventBus.emit).toHaveBeenCalledWith(Event.STOPPED)
})

it('should retry indefinitely if maxRetries is set to Infinity', () => {
const streamer = getStreamer()

streamer.start()
expect(mockXHR.send).toHaveBeenCalled()

for (let i = 0; i < 100; i++) {
mockXHR.onerror({} as ProgressEvent)
jest.advanceTimersByTime(getRandom(1000, 10000))
}

expect(logError).not.toHaveBeenCalledWith('Streaming: Max streaming retries reached. Staying in polling mode.')
expect(mockXHR.send).toHaveBeenCalledTimes(101)
})

it('should reconnect successfully after multiple failures', () => {
const streamer = getStreamer({}, 5)

streamer.start()
expect(mockXHR.send).toHaveBeenCalled()

for (let i = 0; i < 3; i++) {
mockXHR.onerror({} as ProgressEvent)
jest.advanceTimersByTime(getRandom(1000, 10000))
}

// Simulate a successful connection on the next attempt
mockXHR.onprogress({} as ProgressEvent)

expect(mockEventBus.emit).toHaveBeenCalledWith(Event.CONNECTED)
expect(mockXHR.send).toHaveBeenCalledTimes(4) // Should attempt to reconnect 3 times before succeeding
})
})
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ const initialize = (apiKey: string, target: Target, options?: Options): Result =
} else if (event.domain === 'target-segment') {
handleSegmentEvent(event)
}
}
},
configurations.maxStreamRetries
)
eventSource.start()
}
Expand Down
22 changes: 20 additions & 2 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ export class Streamer {
private readTimeoutCheckerId: any
private connectionOpened = false
private disconnectEventEmitted = false
private reconnectAttempts = 0
private reconnectAttempts = 0
private retriesExhausted: boolean = false

constructor(
private eventBus: Emitter,
Expand All @@ -22,7 +23,8 @@ export class Streamer {
private fallbackPoller: Poller,
private logDebug: (...data: any[]) => void,
private logError: (...data: any[]) => void,
private eventCallback: (e: StreamEvent) => void
private eventCallback: (e: StreamEvent) => void,
private maxRetries: number
) {}

start() {
Expand Down Expand Up @@ -60,10 +62,26 @@ export class Streamer {
)
}

if (this.reconnectAttempts >= this.maxRetries) {
this.retriesExhausted = true
if (this.configurations.pollingEnabled) {
this.logErrorMessage('Max streaming retries reached. Staying in polling mode.')
} else {
this.logErrorMessage(
'Max streaming retries reached. Polling mode is disabled and will receive no further flag updates until SDK client is restarted.'
)
}
return
}

setTimeout(() => this.start(), reconnectDelayMs)
}

const onFailed = (msg: string) => {
if (this.retriesExhausted) {
return
}

if (!!msg) {
this.logDebugMessage('Stream has issue', msg)
}
Expand Down
6 changes: 6 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ export interface Options {
* @default console
*/
logger?: Logger

/**
* By default, the stream will attempt to reconnect indefinitely if it disconnects. Use this option to limit
* the number of attempts it will make.
*/
maxStreamRetries?: number
}

export interface MetricsInfo {
Expand Down
3 changes: 2 additions & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ export const defaultOptions: Options = {
eventsSyncInterval: MIN_EVENTS_SYNC_INTERVAL,
pollingInterval: MIN_POLLING_INTERVAL,
streamEnabled: true,
cache: false
cache: false,
maxStreamRetries: Infinity
}

export const getConfiguration = (options: Options): Options => {
Expand Down

0 comments on commit 1e4218b

Please sign in to comment.