Skip to content

Commit

Permalink
#54: Added byte rollups for expanded data elements
Browse files Browse the repository at this point in the history
  • Loading branch information
darnjo committed Oct 3, 2023
1 parent 5e62676 commit 68a0b04
Show file tree
Hide file tree
Showing 3 changed files with 328 additions and 169 deletions.
142 changes: 69 additions & 73 deletions lib/replication/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,28 @@
const { replicationIterator, REPLICATION_STRATEGIES } = require('./replication-iterator');
const { scorePayload, writeDataAvailabilityReport, ERROR_TYPES } = require('./utils');
const { writeFile, mkdir } = require('fs/promises');
const { join } = require('path');
const humanizeDuration = require('humanize-duration');

const REPLICATION_DIRECTORY_NAME = 'reso-replication-output';

/**
*
* Parses the OData resource name from the given URI
*
* Example: https://some.api.com/v2/Property
*
* @param {String} requestUri the string for the OData request URI
* @returns OData resource name or null
*/
const parseResourceNameFromODataRequestUri = (requestUri = '') => {
try {
const [resourceName = null] = new URL(requestUri).pathname.split('/').slice(-1);
return resourceName;
} catch (err) {
console.error(err);
return null;
}
};

/**
* Builds a path to use when saving results
*
* @param {String} outputPath the target directory in which to save results (current by default)
* @param {*} resourceName the name of the Data Dictionary resource whose files are being saved
* @returns an operating system dependent path to save replication data with
*/
const buildOutputFilePath = (outputPath, resourceName) =>
join(outputPath, REPLICATION_DIRECTORY_NAME, resourceName, new Date().toISOString().replaceAll(':', '-'));
const { parseResourceNameFromODataRequestUri, buildOutputFilePath } = require('./utils');
const { join } = require('path');

/**
* Replicates data from the given OData request URL using the given strategy, credentials, and options
*
* @param {Object} args this function takes multiple parameters
* @returns this function has no return value, but will produce side effects if outputPath is used (will write files)
*/
const replicate = async ({ url: requestUri, strategy, bearerToken, outputPath, limit, expansions = ['Media' /* TODO */] }) => {
const replicate = async ({ url: initialRequestUri, strategy, bearerToken, outputPath, limit, expansions = ['Media' /* TODO */] }) => {
if (!Object.values(REPLICATION_STRATEGIES).includes(strategy)) {
throw new Error(`Unknown strategy: '${strategy}'!`);
}

const resourceName = parseResourceNameFromODataRequestUri(requestUri),
const resourceName = parseResourceNameFromODataRequestUri(initialRequestUri),
shouldSaveResults = !!outputPath,
resultsPath = shouldSaveResults ? buildOutputFilePath(outputPath, resourceName) : null;

const startTime = Date.now(),
responseTimes = [],
resourceAvailabilityMap = {};

let totalRecordCount = 0;

try {
for await (const {
hasResults = false,
Expand All @@ -68,9 +35,16 @@ const replicate = async ({ url: requestUri, strategy, bearerToken, outputPath, l
startTime = 0,
stopTime = 0,
totalRecordsFetched = 0,
pagesFetched = 0
} of replicationIterator({ requestUri, strategy, authInfo: { bearerToken } })) {
pagesFetched = 0,
requestUri,
responseBytes = 0
} of replicationIterator({ initialRequestUri, strategy, authInfo: { bearerToken }, limit })) {

// inner try in case we want to keep going on error
// for example, we get an error and can't parse one payload or response, but want
// to keep going until the error limit
try {
//handle errors
if (hasError) {
const { statusCode, message, errorType, error: errorData } = error;
if (errorType === ERROR_TYPES.HTTP) {
Expand All @@ -86,66 +60,88 @@ const replicate = async ({ url: requestUri, strategy, bearerToken, outputPath, l
}
}

//process results
if (hasResults) {
if (response?.value?.length) {
scorePayload({
requestUri,
records: response.value,
resourceAvailabilityMap, // mutated on each call
resourceName,
expansions,
responseTimeMs,
startTime,
stopTime
});

if (shouldSaveResults) {
await mkdir(resultsPath, { recursive: true });
await writeFile(join(resultsPath, `page-${pagesFetched}.json`), JSON.stringify(response));
}
//TODO: call schema validator if flag is set

scorePayload({
requestUri,
records: response.value,
resourceAvailabilityMap, // mutated on each call
resourceName,
expansions,
responseTimeMs,
startTime,
stopTime,
responseBytes
});

if (shouldSaveResults) {
await mkdir(resultsPath, { recursive: true });
await writeFile(join(resultsPath, `page-${pagesFetched}.json`), JSON.stringify(response));
}
}

totalRecordCount = totalRecordsFetched;
responseTimes.push(responseTimeMs);
if (!!limit && totalRecordsFetched >= limit) {
break;
}

} catch (err) {
//TODO: add logic to allow fast fail, in which case we'd return here
console.error(err);
}

if (!!limit && totalRecordsFetched >= limit) {
break;
}
}
} catch (err) {
//TODO: add logic to allow fast fail, in which case we'd return here
console.error(err);
} finally {

// try writing report
try {
await writeDataAvailabilityReport({ version: '1.7', resourceAvailabilityMap });
} catch (err) {
console.error(`Could not write data availability report! ${err}`);
}

console.log(`\nReplication completed in ${humanizeDuration(Date.now() - startTime, { round: false })}!`);
console.log(`\nRESO Replication Client completed in ${humanizeDuration(Date.now() - startTime, { round: false })}!`);

const runtimeStats = createRuntimeAvailabilityStats(resourceAvailabilityMap);
console.log(
`Total requests: ${responseTimes?.length || 0}, Average response time: ${humanizeDuration(
parseInt(
responseTimes?.reduce((acc, item) => {
if (item) {
acc += item;
}
return acc;
}, 0) / (responseTimes.length || 1)
)
)}, Total records: ${totalRecordCount}\n`
`Total requests: ${runtimeStats.totalRequests}, ` +
`Average response time: ${humanizeDuration(runtimeStats.totalResponseTimeMs / (runtimeStats.totalRequests || 1))}, ` +
`Records fetched: ${runtimeStats.totalRecordCount}${runtimeStats.expandedRecordCount
? `, Expanded records: ${runtimeStats.expandedRecordCount}` : ''}\n`
);
}
return;
};

const createRuntimeAvailabilityStats = (resourceAvailabilityMap = {}) =>
Object.entries(resourceAvailabilityMap).reduce(
(acc, [, { isExpansion = false, responses = [], numRecordsFetched = 0 }]) => {

if (isExpansion) {
acc.expandedRecordCount += numRecordsFetched;
} else {
acc.totalRecordCount += numRecordsFetched;
}

responses.forEach(({ responseTimeMs = 0 }) => {
if (!isExpansion) {
acc.totalRequests++;
acc.totalResponseTimeMs += responseTimeMs;
}
return acc;
});
return acc;
},
{
totalRequests: 0,
totalResponseTimeMs: 0,
totalRecordCount: 0,
expandedRecordCount: 0
}
);

module.exports = {
replicate
};
52 changes: 39 additions & 13 deletions lib/replication/replication-iterator.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
'use strict';

const queryString = require('node:querystring');
const { processHttpErrorResponse, ERROR_TYPES } = require('./utils');
const { processHttpErrorResponse, ERROR_TYPES, calculateJsonSize } = require('./utils');
const humanizeDuration = require('humanize-duration');

const MAX_RECORD_COUNT_DEFAULT = 100000,
DEFAULT_PAGE_SIZE = 100,
const DEFAULT_PAGE_SIZE = 100,
ODATA_VALUE_PROPERTY_NAME = 'value',
ODATA_NEXT_LINK_PROPERTY_NAME = '@odata.nextLink';

Expand All @@ -31,7 +30,35 @@ const REPLICATION_STRATEGIES = Object.freeze({
* @param {String} token bearer token to be used for a given HTTP request
* @returns a header constructed from the given token, or an empty object if the token is invalid
*/
const getBearerTokenAuthHeader = (token = '') => (token?.length ? { Authorization: `Bearer ${token}` } : {});
const getOAuth2BearerTokenHeader = ({ bearerToken = '', clientCredentials = {} }) => {
let authToken;

if (bearerToken?.length) {
authToken = bearerToken;
} else if (!!clientCredentials && Object.values(clientCredentials)?.length) {
authToken = getClientCredentialsAuthToken(clientCredentials);
}

return { Authorization: `Bearer ${authToken}` };
};

/**
* Fetches an auth token from the given tokenUri and other authInfo
*
* TODO: implement
*
* @param {Object} authInfo contains the OAuth2 Client Credentials info to get an auth token
* @returns an Authorization header with the token
*/
const getClientCredentialsAuthToken = ({ tokenUri, clientId, clientSecret, scope }) => {
console.log(
`tokenUri is :${tokenUri}, clientId (last4): ${clientId?.slice(-4)}, clientSecret (last4): ${clientSecret?.slice(-4)}, scope: ${scope}`
);

//TODO get token
const token = 'test-token';
return getOAuth2BearerTokenHeader(token);
};

const buildRequestUri = ({ requestUri, strategy, totalRecordsFetched = 0, pageSize, /* TODO lastIsoTimestamp, */ nextLink }) => {
const [baseUri = null, query = null] = requestUri.split('?');
Expand Down Expand Up @@ -67,11 +94,7 @@ const buildRequestUri = ({ requestUri, strategy, totalRecordsFetched = 0, pageSi
* @param {Object} config configuration for the replication iterator
* @returns yields with a number of parameters relevant to replication
*/
async function* replicationIterator(config = {}) {
const { requestUri: initialRequestUri = '', maxErrorCount = 3, authInfo = {}, strategy } = config;

const { bearerToken /* TODO clientCredentials */ } = authInfo;

async function* replicationIterator({ initialRequestUri = '', maxErrorCount = 3, authInfo = {}, strategy }) {
let pageSize = DEFAULT_PAGE_SIZE,
pagesFetched = 0,
numErrors = 0,
Expand All @@ -85,7 +108,7 @@ async function* replicationIterator(config = {}) {

// TODO: handle client credentials auth
const headers = {
...getBearerTokenAuthHeader(bearerToken)
...getOAuth2BearerTokenHeader(authInfo)
};

if (strategy === REPLICATION_STRATEGIES.NEXT_LINK) {
Expand All @@ -96,6 +119,7 @@ async function* replicationIterator(config = {}) {

do {
let responseJson = null,
responseBytes = 0,
responseStatus = 0,
error = null;

Expand Down Expand Up @@ -136,6 +160,7 @@ async function* replicationIterator(config = {}) {
lastRequestUri = requestUri;
responseStatus = response.status;
responseJson = await response.json();
responseBytes = calculateJsonSize(responseJson);

//process records
if (response.ok) {
Expand All @@ -155,7 +180,7 @@ async function* replicationIterator(config = {}) {
} else {
stopTime = new Date().toISOString();
error = {
errorType: 'http',
errorType: ERROR_TYPES.HTTP,
...processHttpErrorResponse(response)
};
numErrors++;
Expand All @@ -182,9 +207,10 @@ async function* replicationIterator(config = {}) {
pagesFetched,
numErrors,
error,
hasError: !!error
hasError: !!error,
responseBytes
};
} while (pageSize > 0 && totalRecordsFetched < MAX_RECORD_COUNT_DEFAULT && numErrors < maxErrorCount);
} while (pageSize > 0 && numErrors <= maxErrorCount);
}

/**
Expand Down
Loading

0 comments on commit 68a0b04

Please sign in to comment.