Skip to content

Commit

Permalink
#54: Added data availability report and ability to save data
Browse files Browse the repository at this point in the history
  • Loading branch information
darnjo committed Sep 28, 2023
1 parent 843e36c commit b7acc1d
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 57 deletions.
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ if (require?.main === module) {
.option('-x, --expansions <string>', 'Items to expand during the query process, e.g. Media')
.option('-m, --metadataReportPath <string>', 'Path to metadata report to use for replication')
.option('-o, --outputPath <string>', 'Name of directory for results')
.option('-l, --limit <number>', 'Limit for total number of records')
.action(replicate);

program
Expand Down
40 changes: 28 additions & 12 deletions lib/replication/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,35 @@ const { scorePayload, writeDataAvailabilityReport } = require('./utils');
const humanizeDuration = require('humanize-duration');
const { writeFile, mkdir } = require('fs/promises');

const replicate = async ({ url: requestUri, strategy, bearerToken, expansions, outputPath }) => {
// need to get the last part of the URL before the querystring
const parseResourceNameFromODataRequestUri = (requestUri = '') => {
try {
const [ resourceName = null ] = (new URL(requestUri))?.pathname?.split('/')?.slice(-1);
return resourceName;
} catch (err) {
console.error(err);
return null;
}
};

const replicate = async ({ url: requestUri, strategy, bearerToken, outputPath, limit }) => {
if (!Object.values(REPLICATION_STRATEGIES).includes(strategy)) {
throw new Error(`Unknown strategy: '${strategy}'!`);
}

const config = {
requestUri,
strategy: strategy,
strategy,
authInfo: {
bearerToken
},
expansions
}
};

const inferredResourceName = 'Lookup',
shouldSaveResults = !!outputPath;
const resourceName = parseResourceNameFromODataRequestUri(requestUri),
shouldSaveResults = !!outputPath,
resultsPath = shouldSaveResults
? `${outputPath}/reso-replication-output/${resourceName}-${new Date().toISOString().replaceAll(':', '-')}`
: null;

const startTime = Date.now(),
responseTimes = [];
Expand All @@ -32,19 +45,22 @@ const replicate = async ({ url: requestUri, strategy, bearerToken, expansions, o
try {
for await (const data of replicationIterator(config)) {
if (data?.hasResults) {
console.log('Data fetched!');
responseTimes.push(data?.responseTimeMs ?? 0);
recordsFetched = data?.recordsFetched ?? 0;

if (data?.response?.value?.length) {
availabilityMap = scorePayload(data.response.value, availabilityMap);
availabilityMap = scorePayload(data.response.value, availabilityMap, resourceName);

if (shouldSaveResults) {
await mkdir(outputPath, { recursive: true });
await writeFile(`${outputPath}/${inferredResourceName}-${++pagesFetched}.json`, JSON.stringify(data.response));
await mkdir(resultsPath, { recursive: true });
await writeFile(`${resultsPath}/page-${++pagesFetched}.json`, JSON.stringify(data.response));
}
}
}

if (!!limit && recordsFetched >= limit) {
break;
}
}

console.log(`\nReplication completed in ~${humanizeDuration(Date.now() - startTime, { round: true })}!`);
Expand All @@ -55,11 +71,11 @@ const replicate = async ({ url: requestUri, strategy, bearerToken, expansions, o
acc += item;
}
return acc;
}, 0) / responseTimes.length
}, 0) / (responseTimes.length || 1)
)}ms, Total records: ${recordsFetched}\n`
);

await writeDataAvailabilityReport(availabilityMap, recordsFetched);
await writeDataAvailabilityReport(resourceName, availabilityMap, recordsFetched);
} catch (err) {
console.error(err);
}
Expand Down
82 changes: 50 additions & 32 deletions lib/replication/replication-iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@ const queryString = require('node:querystring');

const MAX_RECORD_COUNT_DEFAULT = 100000,
DEFAULT_PAGE_SIZE = 100,
ODATA_VALUE_PROPERTY_NAME = 'value';
ODATA_VALUE_PROPERTY_NAME = 'value',
ODATA_NEXT_LINK_PROPERTY_NAME = '@odata.nextLink';

// See: https://docs.oasis-open.org/odata/odata/v4.01/odata-v4.01-part1-protocol.html#sec_HeaderPrefer
// The value of the Prefer header is a comma-separated list of preferences.
const ODATA_PREFER_HEADER_NAME = 'Prefer', ODATA_MAX_PAGE_SIZE_HEADER_NAME = 'odata.maxpagesize';

// See: https://docs.oasis-open.org/odata/odata/v4.01/odata-v4.01-part1-protocol.html#_Toc31358888
// The value of the Preference-Applied header is a comma-separated list of preferences applied in the response.
const ODATA_PREFERENCE_APPLIED_HEADER_NAME = 'Preference-Applied';

const REPLICATION_STRATEGIES = Object.freeze({
TOP_AND_SKIP: 'TopAndSkip',
Expand All @@ -15,13 +24,13 @@ const REPLICATION_STRATEGIES = Object.freeze({

const getBearerTokenAuthHeader = (token = '') => (token?.length ? { Authorization: `Bearer ${token}` } : {});

const buildRequestUri = ({ requestUri, strategy, recordsFetched = 0, lastPageSize /* lastIsoTimestamp, nextLink */ }) => {
const buildRequestUri = ({ requestUri, strategy, recordsFetched = 0, pageSize, lastIsoTimestamp, nextLink }) => {
const [baseUri = null, query = null] = requestUri.split('?');

const queryParams = query !== null ? queryString.parse(query) : {};

if (strategy === REPLICATION_STRATEGIES.TOP_AND_SKIP) {
const { $top: top = lastPageSize ?? DEFAULT_PAGE_SIZE, ...remainingParams } = queryParams;
const { $top = pageSize ?? DEFAULT_PAGE_SIZE, ...remainingParams } = queryParams;

//$skip param from queryParams is always ignored
delete remainingParams.$skip;
Expand All @@ -33,83 +42,92 @@ const buildRequestUri = ({ requestUri, strategy, recordsFetched = 0, lastPageSiz
} else if (strategy === REPLICATION_STRATEGIES.TIMESTAMP_DESC) {
throw new Error(`Unsupported replication strategy '${strategy}'!`);
} else if (strategy === REPLICATION_STRATEGIES.NEXT_LINK) {
throw new Error(`Unsupported replication strategy '${strategy}'!`);
return !!nextLink ? nextLink : requestUri;
} else {
throw new Error(`Unsupported replication strategy '${strategy}'!`);
}
};

async function* replicationIterator(config = {}) {
const {
requestUri: initialRequestUri = '',
maxErrorCount = 3,
authInfo = {},
strategyInfo = { strategy: 'TopAndSkip', pageSize: DEFAULT_PAGE_SIZE }
} = config;
const { requestUri: initialRequestUri = '', maxErrorCount = 3, authInfo = {}, strategy } = config;

const { bearerToken /* clientCredentials */ } = authInfo;

const headers = {
...getBearerTokenAuthHeader(bearerToken)
};
const { bearerToken, clientCredentials } = authInfo;

let successfulRequestCount = 0,
errorRequestCount = 0,
recordsFetched = 0,
lastPageSize = DEFAULT_PAGE_SIZE;
pageSize = DEFAULT_PAGE_SIZE;

//GET https://api.reso.org/Property
let requestUri = initialRequestUri,
lastRequestUri = null;
lastRequestUri = null,
lastIsoTimestamp = null,
nextLink = null;

console.log('Request uri is: ' + requestUri);
const headers = {
...getBearerTokenAuthHeader(bearerToken)
};

if (strategy === REPLICATION_STRATEGIES.NEXT_LINK) {
headers[ODATA_PREFER_HEADER_NAME] = `${ODATA_MAX_PAGE_SIZE_HEADER_NAME}=${pageSize}`;
}

console.log(`Initial request uri: ${requestUri}\n`);

do {
let responseJson = null,
responseStatus = 0,
error = null;
//lastIsoTimestamp = null,
//nextLink = null;

requestUri = buildRequestUri({
requestUri,
strategy: strategyInfo.strategy,
strategy,
recordsFetched,
lastPageSize
//lastIsoTimestamp,
//nextLink
pageSize,
lastIsoTimestamp,
nextLink
});

if (requestUri === lastRequestUri) {
throw new Error(`Same URLs found for consecutive requests!\n\tRequestUri: ${requestUri}\n\tLastRequestUri: ${lastRequestUri}`);
console.error(`Same URL found for consecutive requests: ${requestUri}\nExiting...`);
return;
}

let responseTimeMs = 0,
startTime;

try {

//request records
console.log(`Fetching records from '${requestUri}'...`);
startTime = Date.now();
const response = await fetch(requestUri, { headers });
responseTimeMs = Date.now() - startTime;

//set state
lastRequestUri = requestUri;
responseStatus = response.status;
responseJson = await response.json();

//process records
if (response.ok) {
lastPageSize = responseJson[`${ODATA_VALUE_PROPERTY_NAME}`]?.length ?? 0;
recordsFetched += lastPageSize;
pageSize = responseJson[ODATA_VALUE_PROPERTY_NAME]?.length ?? 0;
nextLink = responseJson[ODATA_NEXT_LINK_PROPERTY_NAME] ?? null;
recordsFetched += pageSize;

if (lastPageSize) {
if (pageSize) {
console.log(
`Request succeeded! Time taken: ${responseTimeMs} ms. Records fetched: ${lastPageSize}. ` +
`Request succeeded! Time taken: ${responseTimeMs}ms. Records fetched: ${pageSize}. ` +
`Total records fetched: ${recordsFetched}\n`
);
} else {
console.log('No records to fetch!');
}

//if the response was OK, the request was successful even if no records
successfulRequestCount++;
} else {
//TODO: when there's an unsuccessful request, sometimes the error message is in the response body
console.error(`${JSON.stringify(responseJson)}\n`);
errorRequestCount++;
error = response?.statusText ?? null;
Expand All @@ -125,14 +143,14 @@ async function* replicationIterator(config = {}) {
responseStatus,
responseTimeMs,
response: responseJson,
hasResults: lastPageSize > 0,
lastPageSize,
hasResults: pageSize > 0,
pageSize,
recordsFetched,
successfulRequestCount,
errorRequestCount,
error
};
} while (lastPageSize > 0 && recordsFetched < MAX_RECORD_COUNT_DEFAULT && errorRequestCount < maxErrorCount);
} while (pageSize > 0 && recordsFetched < MAX_RECORD_COUNT_DEFAULT && errorRequestCount < maxErrorCount);
}

/**
Expand Down
26 changes: 13 additions & 13 deletions lib/replication/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,32 @@ const { writeFile } = require('fs/promises');
}
*/
const scorePayload = (records = [], availabilityMap = {}) => {
records.forEach(r => {
Object.entries(r).forEach(([k, v]) => {
if (!availabilityMap?.[k]) {
availabilityMap[k] = 0;
const scorePayload = (records = [], availabilityMap = {}, resourceName, expansions = []) => {
records.forEach(record => {
Object.entries(record).forEach(([fieldName, value]) => {
if (!availabilityMap?.[fieldName]) {
availabilityMap[fieldName] = 0;
}

if (v) {
availabilityMap[k]++;
if (!!value && (typeof value !== 'object' || Object.values(value)?.length)) {
availabilityMap[fieldName]++;
}
});
});

return availabilityMap;
};

const consolidateResults = (availabilityMap = {}, totalRecordCount = 0) => {
const consolidateResults = (resourceName, availabilityMap = {}, numRecordsFetched) => {
return {
totalRecordCount,
fields: Object.entries(availabilityMap ?? {}).map(([k, v]) => {
return { fieldName: k, frequency: v };
resources: [{ resourceName, numRecordsFetched }],
fields: Object.entries(availabilityMap ?? {}).map(([fieldName, frequency]) => {
return { resourceName, fieldName, frequency };
})
};
};

const writeDataAvailabilityReport = async (availabilityMap = {}, totalRecordCount = 0) => {
const writeDataAvailabilityReport = async (resourceName, availabilityMap = {}, totalRecordCount = 0) => {
const AVAILABILITY_REPORT_FILENAME = 'data-availability-report.json';

try {
Expand All @@ -67,7 +67,7 @@ const writeDataAvailabilityReport = async (availabilityMap = {}, totalRecordCoun
description: 'RESO Data Availability Report',
version: '1.7',
generatedOn: new Date().toISOString(),
fields: consolidateResults(availabilityMap, totalRecordCount)
...consolidateResults(resourceName, availabilityMap, totalRecordCount)
},
null,
' '
Expand Down
6 changes: 6 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"node": ">=18.0.0"
},
"dependencies": {
"-": "^0.0.1",
"aws-sdk": "^2.1354.0",
"axios": "^0.24.0",
"chalk": "^4.1.2",
Expand Down

0 comments on commit b7acc1d

Please sign in to comment.