Skip to content

Commit

Permalink
FFM-8886 Add retries to polling and streaming for fetching flags/segm…
Browse files Browse the repository at this point in the history
…ents. Add configurable axios timeout
  • Loading branch information
erdirowlands committed Aug 4, 2023
1 parent cec86bb commit ba5f512
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 33 deletions.
48 changes: 48 additions & 0 deletions examples/lambda_debug/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
const { Client } = require("@harnessio/ff-nodejs-server-sdk");

let client = null;

async function initialize(enableStream) {
const apiKey = "96088ba9-cf76-40df-b1f6-63481454944f";
client = new Client(apiKey, { enableStream: enableStream, pollInterval: 2 * 60 * 1000 });
await client.waitForInitialization();
}

async function getFeatureFlagValue(enableStream, { flagName, defaultValue, targetIdentifier, type = "boolean" }) {
try {
if (!client) await initialize(enableStream);

const target = { identifier: targetIdentifier };

switch (type) {
case "string":
return client.stringVariation(flagName, target, getDefaultValue({ defaultValue, type }));
case "number":
return client.numberVariation(flagName, target, getDefaultValue({ defaultValue, type }));
case "json":
return client.jsonVariation(flagName, target, getDefaultValue({ defaultValue, type }));
default:
return client.boolVariation(flagName, target, getDefaultValue({ defaultValue, type }));
}
} catch (error) {
console.error("Error getting feature flag value", error);
return getDefaultValue({ defaultValue, type });
}
}

function getDefaultValue({ defaultValue, type, useLocalDefault = false }) {
if (defaultValue !== undefined) return defaultValue;

switch (type) {
case "string":
return "";
case "number":
return 0;
case "json":
return {};
default:
return useLocalDefault;
}
}

module.exports = { getFeatureFlagValue };
157 changes: 157 additions & 0 deletions examples/lambda_debug/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions examples/lambda_debug/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"name": "getting_started",
"version": "1.0.0",
"main": "index.js",
"dependencies": {
"@harnessio/ff-nodejs-server-sdk": "1.3.1"
}
}
4 changes: 2 additions & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import {
warnMissingSDKKey,
} from './sdk_codes';

axios.defaults.timeout = 30000;
axiosRetry(axios, { retries: 3, retryDelay: axiosRetry.exponentialDelay });

enum Processor {
Expand Down Expand Up @@ -70,6 +69,7 @@ export default class Client {
this.sdkKey = sdkKey;
this.options = { ...defaultOptions, ...options };
this.log = this.options.logger;
axios.defaults.timeout = this.options.axiosTimeout;

if (options.pollInterval < defaultOptions.pollInterval) {
this.options.pollInterval = defaultOptions.pollInterval;
Expand Down Expand Up @@ -436,7 +436,7 @@ export default class Client {
);
return Promise.resolve(defaultValue);
}

return this.evaluator.jsonVariation(
identifier,
target,
Expand Down
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export const defaultOptions: Options = {
eventsUrl: EVENTS_URL,
pollInterval: PULL_INTERVAL,
eventsSyncInterval: EVENTS_SYNC_INTERVAL,
axiosTimeout: 30000,
enableStream: true,
enableAnalytics: true,
cache: new LRU({ max: 100 }),
Expand Down
73 changes: 44 additions & 29 deletions src/polling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,41 +88,56 @@ export class PollingProcessor {
});
}

private async retrieveFlags(): Promise<void> {
try {
this.log.debug('Fetching flags started');
const response = await this.api.getFeatureConfig(
this.environment,
this.cluster,
);
this.log.debug('Fetching flags finished');
response.data.forEach((fc: FeatureConfig) =>
this.repository.setFlag(fc.feature, fc),
);
} catch (error) {
this.log.error('Error loading flags', error);
throw error;
private async retrieveFlags(retries = 3): Promise<void> {
for (let i = 0; i < retries; i++) {
try {
this.log.debug('Fetching flags attempt ' + (i + 1));
const response = await this.api.getFeatureConfig(
this.environment,
this.cluster,
);
this.log.debug('Fetching flags finished');
response.data.forEach((fc: FeatureConfig) =>
this.repository.setFlag(fc.feature, fc),
);
// If successful, break the loop early
break;
} catch (error) {
// If this was the last attempt, rethrow the error
if (i === retries - 1) {
this.log.error('Error loading flags', error);
throw error;
}
}
}
}

private async retrieveSegments(): Promise<void> {
try {
this.log.debug('Fetching segments started');
const response = await this.api.getAllSegments(
this.environment,
this.cluster,
);
this.log.debug('Fetching segments finished');
// prepare cache for storing segments
response.data.forEach((segment: Segment) =>
this.repository.setSegment(segment.identifier, segment),
);
} catch (error) {
this.log.error('Error loading segments', error);
throw error;

private async retrieveSegments(retries = 3): Promise<void> {
for (let i = 0; i < retries; i++) {
try {
this.log.debug('Fetching segments attempt ' + (i + 1));
const response = await this.api.getAllSegments(
this.environment,
this.cluster,
);
this.log.debug('Fetching segments finished');
response.data.forEach((segment: Segment) =>
this.repository.setSegment(segment.identifier, segment),
);
// If successful, break the loop early
break;
} catch (error) {
// If this was the last attempt, rethrow the error
if (i === retries - 1) {
this.log.error('Error loading segments', error);
throw error;
}
}
}
}


start(): void {
if (!this.stopped) {
this.log.info('PollingProcessor already started');
Expand Down
30 changes: 28 additions & 2 deletions src/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,21 +177,47 @@ export class StreamProcessor {
if (msg.domain === 'flag') {
this.msgProcessor(
msg,
this.api.getFeatureConfigByIdentifier.bind(this.api),
this.getFeatureConfigByIdentifierWithRetries(3),
this.repository.setFlag.bind(this.repository),
this.repository.deleteFlag.bind(this.repository),
);
} else if (msg.domain === 'target-segment') {
this.msgProcessor(
msg,
this.api.getSegmentByIdentifier.bind(this.api),
this.getSegmentByIdentifierWithRetries(3),
this.repository.setSegment.bind(this.repository),
this.repository.deleteSegment.bind(this.repository),
);
}
}
}

private getFeatureConfigByIdentifierWithRetries(maxRetries: number) {
for (let i = 0; i < maxRetries; i++) {
try {
return this.api.getFeatureConfigByIdentifier.bind(this.api);
} catch (error) {
console.log(`Attempt ${i + 1} failed. Retrying...`);
if (i === maxRetries - 1) {
throw error;
}
}
}
}

private getSegmentByIdentifierWithRetries(maxRetries: number) {
for (let i = 0; i < maxRetries; i++) {
try {
return this.api.getSegmentByIdentifier.bind(this.api);
} catch (error) {
console.log(`Attempt ${i + 1} failed. Retrying...`);
if (i === maxRetries - 1) {
throw error;
}
}
}
}

private async msgProcessor(
msg: StreamMsg,
fn: FetchFunction,
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export interface Options {
eventsUrl?: string;
pollInterval?: number;
eventsSyncInterval?: number;
axiosTimeout?: number,
enableStream?: boolean;
enableAnalytics?: boolean;
cache?: KeyValueStore;
Expand Down

0 comments on commit ba5f512

Please sign in to comment.