Skip to content

Commit

Permalink
#54: Added support for median and std dev for times and bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
darnjo committed Oct 4, 2023
1 parent 68a0b04 commit 6a65abf
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 81 deletions.
64 changes: 7 additions & 57 deletions lib/replication/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
'use strict';

const { replicationIterator, REPLICATION_STRATEGIES } = require('./replication-iterator');
const { scorePayload, writeDataAvailabilityReport, ERROR_TYPES } = require('./utils');
const { DEFAULT_DD_VERSION, NOT_OK, handleError, scorePayload, writeDataAvailabilityReport, displayRuntimeInfo } = require('./utils');
const { writeFile, mkdir } = require('fs/promises');
const humanizeDuration = require('humanize-duration');
const { parseResourceNameFromODataRequestUri, buildOutputFilePath } = require('./utils');
const { join } = require('path');

Expand Down Expand Up @@ -39,25 +38,10 @@ const replicate = async ({ url: initialRequestUri, strategy, bearerToken, output
requestUri,
responseBytes = 0
} of replicationIterator({ initialRequestUri, strategy, authInfo: { bearerToken }, limit })) {

// inner try in case we want to keep going on error
// for example, we get an error and can't parse one payload or response, but want
// to keep going until the error limit
try {
//handle errors
if (hasError) {
const { statusCode, message, errorType, error: errorData } = error;
if (errorType === ERROR_TYPES.HTTP) {
console.error(`HTTP request error! Status code: ${statusCode}, message: '${message}'`);
} else {
let errorString = null;
try {
errorString = JSON.stringify(JSON.parse(errorData));
} catch (err) {
errorString = err?.toString ? err.toString() : err.cause || '<unknown>';
}
throw new Error(`${errorType} error occurred! Error: ${errorString}`);
}
handleError(error);
}

//process results
Expand Down Expand Up @@ -85,63 +69,29 @@ const replicate = async ({ url: initialRequestUri, strategy, bearerToken, output
if (!!limit && totalRecordsFetched >= limit) {
break;
}

} catch (err) {
//TODO: add logic to allow fast fail, in which case we'd return here
console.error(err);
return NOT_OK;
}
}
} catch (err) {
//TODO: add logic to allow fast fail, in which case we'd return here
console.error(err);
return NOT_OK;
} finally {

try {
await writeDataAvailabilityReport({ version: '1.7', resourceAvailabilityMap });
await writeDataAvailabilityReport({ version: DEFAULT_DD_VERSION, resourceAvailabilityMap });
} catch (err) {
console.error(`Could not write data availability report! ${err}`);
}

console.log(`\nRESO Replication Client completed in ${humanizeDuration(Date.now() - startTime, { round: false })}!`);

const runtimeStats = createRuntimeAvailabilityStats(resourceAvailabilityMap);
console.log(
`Total requests: ${runtimeStats.totalRequests}, ` +
`Average response time: ${humanizeDuration(runtimeStats.totalResponseTimeMs / (runtimeStats.totalRequests || 1))}, ` +
`Records fetched: ${runtimeStats.totalRecordCount}${runtimeStats.expandedRecordCount
? `, Expanded records: ${runtimeStats.expandedRecordCount}` : ''}\n`
);
displayRuntimeInfo({ version: DEFAULT_DD_VERSION, startTime, resourceAvailabilityMap });
}
return;
};

const createRuntimeAvailabilityStats = (resourceAvailabilityMap = {}) =>
Object.entries(resourceAvailabilityMap).reduce(
(acc, [, { isExpansion = false, responses = [], numRecordsFetched = 0 }]) => {

if (isExpansion) {
acc.expandedRecordCount += numRecordsFetched;
} else {
acc.totalRecordCount += numRecordsFetched;
}

responses.forEach(({ responseTimeMs = 0 }) => {
if (!isExpansion) {
acc.totalRequests++;
acc.totalResponseTimeMs += responseTimeMs;
}
return acc;
});
return acc;
},
{
totalRequests: 0,
totalResponseTimeMs: 0,
totalRecordCount: 0,
expandedRecordCount: 0
}
);

module.exports = {
replicate
};
2 changes: 1 addition & 1 deletion lib/replication/replication-iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const getOAuth2BearerTokenHeader = ({ bearerToken = '', clientCredentials = {} }

if (bearerToken?.length) {
authToken = bearerToken;
} else if (!!clientCredentials && Object.values(clientCredentials)?.length) {
} else if (Object.values(clientCredentials)?.length) {
authToken = getClientCredentialsAuthToken(clientCredentials);
}

Expand Down
154 changes: 131 additions & 23 deletions lib/replication/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const { writeFile } = require('fs/promises');
const { join } = require('path');
const humanizeDuration = require('humanize-duration');

const REPLICATION_DIRECTORY_NAME = 'reso-replication-output';

Expand All @@ -10,6 +11,10 @@ const ERROR_TYPES = {
GENERAL: 'general'
};

const NOT_OK = 1;

const DEFAULT_DD_VERSION = '1.7', POSTAL_CODE_FIELD_NAME = 'PostalCode';

/**
* Scores a payload with the given data
* @param {Object} data options to be extracted from the caller
Expand All @@ -27,7 +32,8 @@ const scorePayload = ({
isExpansion = false,
expansions = [],
responseTimeMs,
responseBytes = 0
responseBytes = 0,
dateField = 'ModificationTimestamp'
}) => {
// init if the resource doesn't exist
if (!resourceAvailabilityMap?.[resourceName]) {
Expand All @@ -38,15 +44,16 @@ const scorePayload = ({
isExpansion,
recordCount: 0,
pageSize: records?.length ?? 0,
dateLow: null,
dateHigh: null,
// TODO: allow passing the date field name
dateField: 'ModificationTimestamp',
dateField,
dateHigh: null,
dateLow: null,
// field availability map is fieldName and frequency
fieldAvailabilityMap: {}
};
}

// update legacy property for current reports
resourceAvailabilityMap[resourceName].numRecordsFetched += records?.length || 0;

//init if not present
Expand All @@ -63,7 +70,6 @@ const scorePayload = ({
};
}
responseInfo.responseBytes = responseBytes;

resourceAvailabilityMap[resourceName].responses.push(responseInfo);

records.forEach(record => {
Expand All @@ -87,37 +93,53 @@ const scorePayload = ({
resourceAvailabilityMap[resourceName].fieldAvailabilityMap[fieldName].frequency++;

// Update resource max and min dates
if (fieldName === resourceAvailabilityMap?.[resourceName]?.dateField) {
if (fieldName === dateField) {
const dateValue = new Date(value),
currentDateLowValue = resourceAvailabilityMap[resourceName].dateLow,
currentDateHighValue = resourceAvailabilityMap[resourceName].dateHigh;

if (currentDateLowValue) {
resourceAvailabilityMap[resourceName].dateLow = new Date(Math.min(new Date(currentDateLowValue), dateValue)).toISOString();
currentDateLowValue = resourceAvailabilityMap?.[resourceName]?.dateLow
? new Date(resourceAvailabilityMap[resourceName].dateLow)
: null,
currentDateHighValue = resourceAvailabilityMap?.[resourceName]?.dateHigh
? new Date(resourceAvailabilityMap[resourceName].dateHigh)
: null;

if (dateValue) {
if (currentDateLowValue) {
resourceAvailabilityMap[resourceName].dateLow = new Date(Math.min(currentDateLowValue, dateValue)).toISOString();
} else {
resourceAvailabilityMap[resourceName].dateLow = dateValue.toISOString();
}

if (currentDateHighValue) {
resourceAvailabilityMap[resourceName].dateHigh = new Date(Math.max(currentDateHighValue, dateValue)).toISOString();
} else {
resourceAvailabilityMap[resourceName].dateHigh = dateValue.toISOString();
}
}
}

if (currentDateHighValue) {
resourceAvailabilityMap[resourceName].dateLow = new Date(Math.max(new Date(currentDateHighValue), dateValue)).toISOString();
if (fieldName === POSTAL_CODE_FIELD_NAME) {
if (!resourceAvailabilityMap?.[resourceName]?.postalCodes) {
resourceAvailabilityMap[resourceName].postalCodes = new Set();
}
resourceAvailabilityMap[resourceName].postalCodes.add(value);
}

// TODO: Enumerations

// process expansions, if present
if (expansions?.includes(fieldName)) {
// TODO: look up the resource name for the expanded field and determine whether it's a collection or not
// for now, just use Media

// TODO: implement without recursion
scorePayload({
requestUri,
// record can either be an array or single value - detect and process accordingly
records: isArray ? value : [value],
resourceAvailabilityMap,
parentResourceName: resourceName,
resourceName: 'Media',
isExpansion: true,
isCollection: isArray,
expansions,
// need to calculate the response size for expansions separately from parent
responseBytes: calculateJsonSize(value)
});
}
Expand All @@ -131,6 +153,11 @@ const scorePayload = ({
});
};

/**
* Determines whether the given value is an object, array, or primitive
* @param {Object} value to test
* @returns object with misc. properties set
*/
const isObjectOrArrayOrPrimitive = value => {
const isObject = typeof value === 'object',
isArray = Array.isArray(value),
Expand Down Expand Up @@ -209,25 +236,28 @@ const consolidateResults = (resourceAvailabilityMap = {}) =>
averageResponseTimeMillis: calculateMean(responseTimeValues) ?? 0
};

const { isExpansion = false, responses = [], recordCount = 0, ...rest } = remainingResourceData;
const { isExpansion = false, responses = [], recordCount = 0, postalCodes: postalCodesSet = new Set(), ...rest } = remainingResourceData;
const postalCodes = postalCodesSet?.size ? Array.from(postalCodesSet) : undefined;
if (isExpansion) {
const { numRecordsFetched = 0, medianBytes = 0, stdDevBytes = 0, averageResponseBytes = 0 } = resourceStats;
// do not include time-based stats for expansions since it's that of their parent resource
const { numRecordsFetched = 0, medianBytes = 0, stdDevBytes = 0, averageResponseBytes = 0 } = resourceStats;
acc.resources.push({
...rest,
isExpansion,
numRecordsFetched,
medianBytes,
stdDevBytes,
averageResponseBytes
averageResponseBytes,
postalCodes
});
} else {
acc.resources.push({
...rest,
...resourceStats,
// only add responses for non-expanded items
responses,
recordCount
recordCount,
postalCodes
});
}

Expand All @@ -243,6 +273,37 @@ const consolidateResults = (resourceAvailabilityMap = {}) =>
}
);

/**
* Creates a pleasing report when running on the command line
* @param {Object} resourceAvailabilityMap availability stats collected during run
* @returns high-level stats for reporting
*/
const createRuntimeAvailabilityStats = (resourceAvailabilityMap = {}) =>
Object.entries(resourceAvailabilityMap).reduce(
(acc, [, { isExpansion = false, responses = [], numRecordsFetched = 0 }]) => {
if (isExpansion) {
acc.expandedRecordCount += numRecordsFetched;
} else {
acc.totalRecordCount += numRecordsFetched;
}

responses.forEach(({ responseTimeMs = 0 }) => {
if (!isExpansion) {
acc.totalRequests++;
acc.totalResponseTimeMs += responseTimeMs;
}
return acc;
});
return acc;
},
{
totalRequests: 0,
totalResponseTimeMs: 0,
totalRecordCount: 0,
expandedRecordCount: 0
}
);

/**
* Writes a data-availability-report.json file for the given version and availability data
*
Expand Down Expand Up @@ -272,6 +333,24 @@ const writeDataAvailabilityReport = async ({ version, resourceAvailabilityMap =
}
};

/**
* Displays info on the console when a job is run
* @param {Object} resourceAvailabilityMap map from availability testing run
*/
const displayRuntimeInfo = ({ startTime, version, resourceAvailabilityMap = {} }) => {
console.log(
`\nRESO Replication Client completed in ${humanizeDuration(Date.now() - startTime, { round: false })}! DD Version: ${version}`
);
const runtimeStats = createRuntimeAvailabilityStats(resourceAvailabilityMap);
console.log(
`Total requests: ${runtimeStats.totalRequests}, ` +
`Average response time: ${humanizeDuration(runtimeStats.totalResponseTimeMs / (runtimeStats.totalRequests || 1))}, ` +
`Records fetched: ${runtimeStats.totalRecordCount}${
runtimeStats.expandedRecordCount ? `, Expanded records: ${runtimeStats.expandedRecordCount}` : ''
}\n`
);
};

/**
* Processes an HTTP error response from the Fetch API
* @param {Response} response the HTTP error response from the Fetch API
Expand Down Expand Up @@ -306,6 +385,11 @@ const parseResourceNameFromODataRequestUri = (requestUri = '') => {
}
};

/**
* Calculates the size of the given JSON Data
* @param {Object} jsonData JSON data
* @returns size of given JSON Data or 0 if it couldn't be processed
*/
const calculateJsonSize = jsonData => {
try {
return Buffer.byteLength(JSON.stringify(jsonData));
Expand Down Expand Up @@ -374,13 +458,37 @@ const calculateMedian = (numbers = []) => {
return sorted[middle];
};


/**
* Handles error objects
* @param {Object} error the error to process
*/
const handleError = error => {
const { statusCode, message, errorType, error: errorData } = error;
if (errorType === ERROR_TYPES.HTTP) {
console.error(`HTTP request error! Status code: ${statusCode}, message: '${message}'`);
} else {
let errorString = null;
try {
errorString = JSON.stringify(JSON.parse(errorData));
} catch (err) {
errorString = err?.toString ? err.toString() : err.cause || '<unknown>';
}
throw new Error(`${errorType} error occurred! Error: ${errorString}`);
}
};

module.exports = {
NOT_OK,
ERROR_TYPES,
REPLICATION_DIRECTORY_NAME,
DEFAULT_DD_VERSION,
scorePayload,
writeDataAvailabilityReport,
processHttpErrorResponse,
ERROR_TYPES,
REPLICATION_DIRECTORY_NAME,
parseResourceNameFromODataRequestUri,
buildOutputFilePath,
calculateJsonSize
calculateJsonSize,
displayRuntimeInfo,
handleError
};

0 comments on commit 6a65abf

Please sign in to comment.