Skip to content

Commit

Permalink
#54: Added response info and deduplication
Browse files Browse the repository at this point in the history
  • Loading branch information
darnjo committed Oct 22, 2023
1 parent 03fd1d4 commit 8f2b978
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 54 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ dist
.DS_Store
.vscode
output
data-availability-report.json
metadata-report.json

# RESO testing stuff
data-availability*.json
metadata-report*.json
reso-replication-output
20 changes: 14 additions & 6 deletions lib/replication/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,16 @@ const replicate = async ({

const startTime = new Date(),
startTimeIsoTimestamp = startTime.toISOString(),
shouldSaveResults = !!outputPath,
resourceAvailabilityMap = {};
shouldSaveResults = !!outputPath;

// mutated on each call
const STATE_VARIABLES = {
resourceAvailabilityMap: {},
expansionAvailabilityMap: {},
// used to detect whether we've pulled the same record using different strategies
recordHashCountMap: {},
responses: []
};

// Each resource and expansion will have its separate set of requests
for await (const request of requests) {
Expand Down Expand Up @@ -105,9 +113,9 @@ const replicate = async ({
...request,
requestUri,
jsonData: responseJson,
resourceAvailabilityMap, // mutated on each call
hasError,
...otherIteratorInfo
...otherIteratorInfo,
...STATE_VARIABLES,
});
}

Expand Down Expand Up @@ -149,13 +157,13 @@ const replicate = async ({

if (shouldGenerateReports) {
try {
await writeDataAvailabilityReport({ version: DEFAULT_DD_VERSION, resourceAvailabilityMap });
await writeDataAvailabilityReport({ version: DEFAULT_DD_VERSION, ...STATE_VARIABLES });
} catch (err) {
console.error(`Could not write data availability report! ${err}`);
}
}

displayRuntimeInfo({ version: DEFAULT_DD_VERSION, startTime, resourceAvailabilityMap });
displayRuntimeInfo({ version: DEFAULT_DD_VERSION, startTime, ...STATE_VARIABLES });

return;
};
Expand Down
102 changes: 56 additions & 46 deletions lib/replication/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,21 @@ const extractValues = (jsonData = {}) => {
const scorePayload = ({
requestUri,
jsonData,
// TODO: consider not mutating
resourceAvailabilityMap = {},
resourceAvailabilityMap = {}, /* TODO: consider not mutating */
recordHashCountMap = {}, /* TODO: consider not mutating */
responses = [],
resourceName,
responseTimeMs,
responseBytes,
dateField = 'ModificationTimestamp',
expansionInfo,
hasError
hasError,
strategy
}) => {
const records = extractValues(jsonData);

// return if there's nothing to process
if (!(!!records && Array.isArray(records) && records?.length)) return;
if (!(records && Array.isArray(records) && records?.length)) return;

// add item to resource availability map if not present
if (!resourceAvailabilityMap?.[resourceName]) {
Expand All @@ -97,41 +99,39 @@ const scorePayload = ({
dateLow: null,
// field availability map is fieldName and frequency
fieldAvailabilityMap: {},
expansionInfo,
recordHashCountMap: {},
responses: []
expansionInfo
};
}

const availabilityMap = resourceAvailabilityMap[resourceName];

// handle responses
availabilityMap.responses.push({
responses.push({
requestUri,
responseTimeMs,
responseBytes,
resourceName,
expansionInfo,
strategy,
hasError: hasError || undefined
});

// Score records
records.forEach(record => {

const recordsToProcess = [];

if (!!expansionInfo) {
// separate expanded records from main one
recordsToProcess.push(...expansionInfo.reduce(
(acc,
({ fieldName, modelName }) => {
recordsToProcess.push(
...expansionInfo.reduce((acc, { fieldName, modelName }) => {
const { [fieldName]: expandedData, ...remainingData } = record;

if (!!expandedData) {
const expandedRecords = Array.isArray(expandedData) ? [...expandedData] : [expandedData];
expandedRecords.map(expandedRecord =>
acc.push({
modelName,
fieldName,
isExpansion: true,
record: expandedRecord
})
Expand All @@ -146,25 +146,27 @@ const scorePayload = ({
};

return acc;
}, [])));
}, [])
);
} else {
// if no expansions, process the entire record
recordsToProcess.push({
modelName: resourceName,
isExpansion: false,
record
})
});
}

// process both parent record and expanded data
recordsToProcess.forEach(recordToProcess => {
const { isExpansion = false, modelName, record = {} } = recordToProcess;

// expansions have been removed at this point and classified as their own records
const recordHash = hashJsonData(record);

// only score if record not already present
if (!resourceAvailabilityMap?.[modelName]?.recordHashCountMap?.[recordHash]) {
availabilityMap.recordHashCountMap[recordHash] = 1;
if (!recordHashCountMap?.[recordHash]) {
recordHashCountMap[recordHash] = 1;

Object.entries(record).forEach(([fieldName, value]) => {
// init if the field if it doesn't exist
Expand All @@ -177,37 +179,45 @@ const scorePayload = ({
}

// if there's a value, it can either be a primitive, or array/object
const { isPrimitive, isArray } = isObjectOrArrayOrPrimitive(value);
const { isPrimitive, isArray, isObject } = isObjectOrArrayOrPrimitive(value);

// functions aren't allowed here, so this covers everything
if (!!value && (isPrimitive || Object.values(value)?.length)) {
// increment usage
availabilityMap.fieldAvailabilityMap[fieldName].frequency++;

processSpecialFields({ availabilityMap, fieldName, dateField, value });

// TODO: Enumerations

// // process expansions, if present
// if (isExpansion && fieldName === expandedFieldName) {
// // TODO: look up the resource name for the expanded field and determine whether it's a collection or not
// // for now, just use Media

// scorePayload({
// requestUri,
// records: isArray ? value : [value],
// resourceAvailabilityMap,
// resourceName,
// isCollection: isArray,
// responseBytes: calculateJsonSize(value),
// ...otherParams
// });
// }
if (!!value) {
if (isArray) {
console.debug('TODO: need to process arrays! value: ' + JSON.stringify(value));
} else if (isPrimitive || isObject) {
// increment usage
availabilityMap.fieldAvailabilityMap[fieldName].frequency++;

processSpecialFields({ availabilityMap, fieldName, dateField, value });

// TODO: Enumerations

// // process expansions, if present
// if (isExpansion && fieldName === expandedFieldName) {
// // TODO: look up the resource name for the expanded field and determine whether it's a collection or not
// // for now, just use Media

// scorePayload({
// requestUri,
// records: isArray ? value : [value],
// resourceAvailabilityMap,
// resourceName,
// isCollection: isArray,
// responseBytes: calculateJsonSize(value),
// ...otherParams
// });
// }
} else {
console.debug('Found data with a type other than primitive, object, or array!');
}
}
});
} else {
console.debug('Found duplicate record hash!');

// increment record count if we see the same record more than once
availabilityMap.recordHashCountMap[recordHash]++;
recordHashCountMap[recordHash]++;
}
});
});
Expand Down Expand Up @@ -475,12 +485,10 @@ const processResponses = (resourceAvailabilityMap = {}) => {
*
* @param {Object} params
*/
const writeDataAvailabilityReport = async ({ version, resourceAvailabilityMap = {} }) => {
const writeDataAvailabilityReport = async ({ version, resourceAvailabilityMap = {}, responses = []}) => {
try {
const generatedOn = new Date().toISOString();

const { responses = [], ...remainingAvailabilityMap } = resourceAvailabilityMap;

// write responses report
await writeFile(
AVAILABILITY_RESPONSES_FILENAME,
Expand All @@ -496,6 +504,8 @@ const writeDataAvailabilityReport = async ({ version, resourceAvailabilityMap =
)
);

console.log(`Response info written to ${AVAILABILITY_REPORT_FILENAME}`);

// write DA report
await writeFile(
AVAILABILITY_REPORT_FILENAME,
Expand All @@ -504,14 +514,14 @@ const writeDataAvailabilityReport = async ({ version, resourceAvailabilityMap =
description: 'RESO Data Availability Report',
version,
generatedOn,
...remainingAvailabilityMap
...resourceAvailabilityMap
},
null,
' '
)
);

console.log(`Results written to ${AVAILABILITY_REPORT_FILENAME}`);
console.log(`Availability results written to ${AVAILABILITY_REPORT_FILENAME}`);
} catch (err) {
console.error(err);
}
Expand Down

0 comments on commit 8f2b978

Please sign in to comment.