Skip to content

Commit

Permalink
#54: Improved replication iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
darnjo committed Oct 1, 2023
1 parent 64d88f8 commit 573c8e0
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 126 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,4 @@ output
data-availability-report.json
metadata-report.json
metadata-report*.json
reso-replication-output
28 changes: 28 additions & 0 deletions lib/misc/upi-parser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
'use strict';

const wellKnownIdentifiers = [
'country',
'stateorprovince',
'county',
'subcounty',
'propertytype',
'subpropertytype',
'parcelnumber',
'subparcelnumber'
];

/*
nss example:
':country:us:stateorprovince:ca:county:06037:subcounty::propertytype:residential:subpropertytype::parcelnumber: [abc] 1-2 :: 3:456 :subparcelnumber:';
*/
const upiParser = ({ version = '2.0', nss = '' }) => {
const regex = new RegExp(wellKnownIdentifiers.map(item => `:${item}:`).join('|'));
const [, country, stateOrProvince, county, subCounty, propertyType, subPropertyType, parcelNumber, subParcelNumber] = nss.split(regex);
return { country, stateOrProvince, county, subCounty, propertyType, subPropertyType, parcelNumber, subParcelNumber };
};

module.exports = {
upiParser
};
73 changes: 40 additions & 33 deletions lib/replication/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@

const { replicationIterator, REPLICATION_STRATEGIES } = require('./replication-iterator');
const { scorePayload, writeDataAvailabilityReport } = require('./utils');
const humanizeDuration = require('humanize-duration');
const { writeFile, mkdir } = require('fs/promises');
const { join } = require('path');
const humanizeDuration = require('humanize-duration');

const REPLICATION_DIRECTORY_NAME = 'reso-replication-output';

// need to get the last part of the URL before the querystring
const parseResourceNameFromODataRequestUri = (requestUri = '') => {
try {
const [ resourceName = null ] = (new URL(requestUri))?.pathname?.split('/')?.slice(-1);
const [resourceName = null] = new URL(requestUri)?.pathname?.split('/')?.slice(-1);
return resourceName;
} catch (err) {
console.error(err);
return null;
}
};

const buildOutputFilePath = (outputPath, resourceName) =>
join(outputPath, REPLICATION_DIRECTORY_NAME, resourceName, new Date().toISOString().replaceAll(':', '-'));

const replicate = async ({ url: requestUri, strategy, bearerToken, outputPath, limit, expansions = [] }) => {
if (!Object.values(REPLICATION_STRATEGIES).includes(strategy)) {
throw new Error(`Unknown strategy: '${strategy}'!`);
Expand All @@ -32,65 +38,66 @@ const replicate = async ({ url: requestUri, strategy, bearerToken, outputPath, l
const resourceName = parseResourceNameFromODataRequestUri(requestUri),
shouldSaveResults = !!outputPath,
resultsPath = shouldSaveResults
? `${outputPath}/reso-replication-output/${resourceName}-${new Date().toISOString().replaceAll(':', '-')}`
? buildOutputFilePath(outputPath, resourceName)
: null;

const startTime = Date.now(),
responseTimes = [];

let recordsFetched = 0,
pagesFetched = 0;
resourceAvailabilityMap = {};

/*
Availability map schema:
{
Property: {
numRecordsFetched: 300
fieldMap: {
ListPrice: 234,
Media: 123
}
}
}
*/
const resourceAvailabilityMap = {};
let responseTimes = [], totalRecordCount = 0;

try {
for await (const data of replicationIterator(config)) {
if (data?.hasResults) {
responseTimes.push(data?.responseTimeMs ?? 0);
recordsFetched = data?.recordsFetched ?? 0;

if (data?.response?.value?.length) {
for await (const {
hasResults = false,
responseTimeMs = 0,
response = {},
startTime = 0,
stopTime = 0,
totalRecordsFetched = 0,
pagesFetched = 0
} of replicationIterator(config)) {
if (hasResults) {
if (response?.value?.length) {
// resourceAvailabilityMap is mutated here
scorePayload(data.response.value, resourceAvailabilityMap, resourceName, expansions);
scorePayload({
requestUri,
records: response.value,
resourceAvailabilityMap,
resourceName,
expansions,
responseTimeMs,
startTime,
stopTime
});

if (shouldSaveResults) {
await mkdir(resultsPath, { recursive: true });
await writeFile(`${resultsPath}/page-${++pagesFetched}.json`, JSON.stringify(data.response));
await writeFile(`${resultsPath}/page-${pagesFetched}.json`, JSON.stringify(response));
}
}

totalRecordCount = totalRecordsFetched;
responseTimes.push(responseTimeMs);
}

if (!!limit && recordsFetched >= limit) {
if (!!limit && totalRecordsFetched >= limit) {
break;
}
}

console.log(`\nReplication completed in ~${humanizeDuration(Date.now() - startTime, { round: true })}!`);
console.log(
`Total requests: ${responseTimes?.length}, Average response time: ${parseInt(
`Total requests: ${responseTimes?.length || 0}, Average response time: ${parseInt(
responseTimes?.reduce((acc, item) => {
if (item) {
acc += item;
}
return acc;
}, 0) / (responseTimes.length || 1)
)}ms, Total records: ${recordsFetched}\n`
)}ms, Total records: ${totalRecordCount}\n`
);

await writeDataAvailabilityReport(resourceName, resourceAvailabilityMap, recordsFetched);
await writeDataAvailabilityReport({ version: '1.7', resourceAvailabilityMap });
} catch (err) {
console.error(err);
}
Expand Down
56 changes: 33 additions & 23 deletions lib/replication/replication-iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const REPLICATION_STRATEGIES = Object.freeze({

const getBearerTokenAuthHeader = (token = '') => (token?.length ? { Authorization: `Bearer ${token}` } : {});

const buildRequestUri = ({ requestUri, strategy, recordsFetched = 0, pageSize, lastIsoTimestamp, nextLink }) => {
const buildRequestUri = ({ requestUri, strategy, totalRecordsFetched = 0, pageSize, lastIsoTimestamp, nextLink }) => {
const [baseUri = null, query = null] = requestUri.split('?');

const queryParams = query !== null ? queryString.parse(query) : {};
Expand All @@ -36,7 +36,7 @@ const buildRequestUri = ({ requestUri, strategy, recordsFetched = 0, pageSize, l
delete remainingParams.$skip;
const remainingQueryString = queryString.stringify(remainingParams) ?? '';

return `${baseUri}?$top=${$top}&$skip=${recordsFetched}${remainingQueryString?.length ? `&${remainingQueryString}` : ''}`;
return `${baseUri}?$top=${$top}&$skip=${totalRecordsFetched}${remainingQueryString?.length ? `&${remainingQueryString}` : ''}`;
} else if (strategy === REPLICATION_STRATEGIES.TIMESTAMP_ASC) {
throw new Error(`Unsupported replication strategy '${strategy}'!`);
} else if (strategy === REPLICATION_STRATEGIES.TIMESTAMP_DESC) {
Expand All @@ -53,11 +53,12 @@ async function* replicationIterator(config = {}) {

const { bearerToken, clientCredentials } = authInfo;

let successfulRequestCount = 0,
errorRequestCount = 0,
recordsFetched = 0,
pageSize = DEFAULT_PAGE_SIZE;

let
pageSize = DEFAULT_PAGE_SIZE,
pagesFetched = 0,
numErrors = 0,
totalRecordsFetched = 0;

//GET https://api.reso.org/Property
let requestUri = initialRequestUri,
lastRequestUri = null,
Expand All @@ -82,7 +83,7 @@ async function* replicationIterator(config = {}) {
requestUri = buildRequestUri({
requestUri,
strategy,
recordsFetched,
totalRecordsFetched,
pageSize,
lastIsoTimestamp,
nextLink
Expand All @@ -94,15 +95,22 @@ async function* replicationIterator(config = {}) {
}

let responseTimeMs = 0,
startTime;
startTime, stopTime;

try {

//request records
console.log(`Fetching records from '${requestUri}'...`);
startTime = Date.now();

const startTimeMs = new Date();
const response = await fetch(requestUri, { headers });
responseTimeMs = Date.now() - startTime;
const stopTimeMs = new Date();

startTime = startTimeMs.toISOString();
stopTime = stopTimeMs.toISOString();

//TODO: legacy property - deprecate
responseTimeMs = stopTimeMs - startTimeMs;

//set state
lastRequestUri = requestUri;
Expand All @@ -113,44 +121,46 @@ async function* replicationIterator(config = {}) {
if (response.ok) {
pageSize = responseJson[ODATA_VALUE_PROPERTY_NAME]?.length ?? 0;
nextLink = responseJson[ODATA_NEXT_LINK_PROPERTY_NAME] ?? null;
recordsFetched += pageSize;
totalRecordsFetched += pageSize;

if (pageSize) {
console.log(
`Request succeeded! Time taken: ${responseTimeMs}ms. Records fetched: ${pageSize}. ` +
`Total records fetched: ${recordsFetched}\n`
`Total records fetched: ${totalRecordsFetched}\n`
);
} else {
console.log('No records to fetch!');
}

//if the response was OK, the request was successful even if no records
successfulRequestCount++;
pagesFetched++;
} else {
//TODO: when there's an unsuccessful request, sometimes the error message is in the response body
console.error(`${JSON.stringify(responseJson)}\n`);
errorRequestCount++;
console.error(response);
numErrors++;
error = response?.statusText ?? null;
stopTime = new Date().toISOString;
}
} catch (err) {
console.error(`${JSON.stringify(err)}\n`);
errorRequestCount++;
console.error(err);
numErrors++;
error = err;
}

yield {
requestUri,
responseStatus,
responseTimeMs,
startTime,
stopTime,
response: responseJson,
hasResults: pageSize > 0,
pageSize,
recordsFetched,
successfulRequestCount,
errorRequestCount,
totalRecordsFetched,
pagesFetched,
numErrors,
error
};
} while (pageSize > 0 && recordsFetched < MAX_RECORD_COUNT_DEFAULT && errorRequestCount < maxErrorCount);
} while (pageSize > 0 && totalRecordsFetched < MAX_RECORD_COUNT_DEFAULT && numErrors < maxErrorCount);
}

/**
Expand Down
Loading

0 comments on commit 573c8e0

Please sign in to comment.