diff --git a/.gitignore b/.gitignore index a318d3e..5babfc4 100644 --- a/.gitignore +++ b/.gitignore @@ -109,3 +109,4 @@ output data-availability-report.json metadata-report.json metadata-report*.json +reso-replication-output diff --git a/lib/misc/upi-parser.js b/lib/misc/upi-parser.js new file mode 100644 index 0000000..8474c59 --- /dev/null +++ b/lib/misc/upi-parser.js @@ -0,0 +1,28 @@ +'use strict'; + +const wellKnownIdentifiers = [ + 'country', + 'stateorprovince', + 'county', + 'subcounty', + 'propertytype', + 'subpropertytype', + 'parcelnumber', + 'subparcelnumber' +]; + +/* + nss example: + + ':country:us:stateorprovince:ca:county:06037:subcounty::propertytype:residential:subpropertytype::parcelnumber: [abc] 1-2 :: 3:456 :subparcelnumber:'; + +*/ +const upiParser = ({ version = '2.0', nss = '' }) => { + const regex = new RegExp(wellKnownIdentifiers.map(item => `:${item}:`).join('|')); + const [, country, stateOrProvince, county, subCounty, propertyType, subPropertyType, parcelNumber, subParcelNumber] = nss.split(regex); + return { country, stateOrProvince, county, subCounty, propertyType, subPropertyType, parcelNumber, subParcelNumber }; +}; + +module.exports = { + upiParser +}; diff --git a/lib/replication/index.js b/lib/replication/index.js index 0212fc8..b50a5b4 100644 --- a/lib/replication/index.js +++ b/lib/replication/index.js @@ -2,13 +2,16 @@ const { replicationIterator, REPLICATION_STRATEGIES } = require('./replication-iterator'); const { scorePayload, writeDataAvailabilityReport } = require('./utils'); -const humanizeDuration = require('humanize-duration'); const { writeFile, mkdir } = require('fs/promises'); +const { join } = require('path'); +const humanizeDuration = require('humanize-duration'); + +const REPLICATION_DIRECTORY_NAME = 'reso-replication-output'; // need to get the last part of the URL before the querystring const parseResourceNameFromODataRequestUri = (requestUri = '') => { try { - const [ resourceName = null ] = (new URL(requestUri))?.pathname?.split('/')?.slice(-1); + const [resourceName = null] = new URL(requestUri)?.pathname?.split('/')?.slice(-1); return resourceName; } catch (err) { console.error(err); @@ -16,6 +19,9 @@ const parseResourceNameFromODataRequestUri = (requestUri = '') => { } }; +const buildOutputFilePath = (outputPath, resourceName) => + join(outputPath, REPLICATION_DIRECTORY_NAME, resourceName, new Date().toISOString().replaceAll(':', '-')); + const replicate = async ({ url: requestUri, strategy, bearerToken, outputPath, limit, expansions = [] }) => { if (!Object.values(REPLICATION_STRATEGIES).includes(strategy)) { throw new Error(`Unknown strategy: '${strategy}'!`); @@ -32,65 +38,66 @@ const replicate = async ({ url: requestUri, strategy, bearerToken, outputPath, l const resourceName = parseResourceNameFromODataRequestUri(requestUri), shouldSaveResults = !!outputPath, resultsPath = shouldSaveResults - ? `${outputPath}/reso-replication-output/${resourceName}-${new Date().toISOString().replaceAll(':', '-')}` + ? buildOutputFilePath(outputPath, resourceName) : null; const startTime = Date.now(), - responseTimes = []; - - let recordsFetched = 0, - pagesFetched = 0; + resourceAvailabilityMap = {}; - /* - Availability map schema: - - { - Property: { - numRecordsFetched: 300 - fieldMap: { - ListPrice: 234, - Media: 123 - } - } - } - */ - const resourceAvailabilityMap = {}; + let responseTimes = [], totalRecordCount = 0; try { - for await (const data of replicationIterator(config)) { - if (data?.hasResults) { - responseTimes.push(data?.responseTimeMs ?? 0); - recordsFetched = data?.recordsFetched ?? 0; - - if (data?.response?.value?.length) { + for await (const { + hasResults = false, + responseTimeMs = 0, + response = {}, + startTime = 0, + stopTime = 0, + totalRecordsFetched = 0, + pagesFetched = 0 + } of replicationIterator(config)) { + if (hasResults) { + if (response?.value?.length) { // resourceAvailabilityMap is mutated here - scorePayload(data.response.value, resourceAvailabilityMap, resourceName, expansions); + scorePayload({ + requestUri, + records: response.value, + resourceAvailabilityMap, + resourceName, + expansions, + responseTimeMs, + startTime, + stopTime + }); if (shouldSaveResults) { await mkdir(resultsPath, { recursive: true }); - await writeFile(`${resultsPath}/page-${++pagesFetched}.json`, JSON.stringify(data.response)); + await writeFile(`${resultsPath}/page-${pagesFetched}.json`, JSON.stringify(response)); } } + + totalRecordCount = totalRecordsFetched; + responseTimes.push(responseTimeMs); } - if (!!limit && recordsFetched >= limit) { + if (!!limit && totalRecordsFetched >= limit) { break; } } console.log(`\nReplication completed in ~${humanizeDuration(Date.now() - startTime, { round: true })}!`); console.log( - `Total requests: ${responseTimes?.length}, Average response time: ${parseInt( + `Total requests: ${responseTimes?.length || 0}, Average response time: ${parseInt( responseTimes?.reduce((acc, item) => { if (item) { acc += item; } return acc; }, 0) / (responseTimes.length || 1) - )}ms, Total records: ${recordsFetched}\n` + )}ms, Total records: ${totalRecordCount}\n` ); - await writeDataAvailabilityReport(resourceName, resourceAvailabilityMap, recordsFetched); + await writeDataAvailabilityReport({ version: '1.7', resourceAvailabilityMap }); } catch (err) { console.error(err); } diff --git a/lib/replication/replication-iterator.js b/lib/replication/replication-iterator.js index feaf9d9..40202b4 100644 --- a/lib/replication/replication-iterator.js +++ b/lib/replication/replication-iterator.js @@ -24,7 +24,7 @@ const REPLICATION_STRATEGIES = Object.freeze({ const getBearerTokenAuthHeader = (token = '') => (token?.length ? { Authorization: `Bearer ${token}` } : {}); -const buildRequestUri = ({ requestUri, strategy, recordsFetched = 0, pageSize, lastIsoTimestamp, nextLink }) => { +const buildRequestUri = ({ requestUri, strategy, totalRecordsFetched = 0, pageSize, lastIsoTimestamp, nextLink }) => { const [baseUri = null, query = null] = requestUri.split('?'); const queryParams = query !== null ? queryString.parse(query) : {}; @@ -36,7 +36,7 @@ const buildRequestUri = ({ requestUri, strategy, recordsFetched = 0, pageSize, l delete remainingParams.$skip; const remainingQueryString = queryString.stringify(remainingParams) ?? ''; - return `${baseUri}?$top=${$top}&$skip=${recordsFetched}${remainingQueryString?.length ? `&${remainingQueryString}` : ''}`; + return `${baseUri}?$top=${$top}&$skip=${totalRecordsFetched}${remainingQueryString?.length ? `&${remainingQueryString}` : ''}`; } else if (strategy === REPLICATION_STRATEGIES.TIMESTAMP_ASC) { throw new Error(`Unsupported replication strategy '${strategy}'!`); } else if (strategy === REPLICATION_STRATEGIES.TIMESTAMP_DESC) { @@ -53,11 +53,12 @@ async function* replicationIterator(config = {}) { const { bearerToken, clientCredentials } = authInfo; - let successfulRequestCount = 0, - errorRequestCount = 0, - recordsFetched = 0, - pageSize = DEFAULT_PAGE_SIZE; - + let + pageSize = DEFAULT_PAGE_SIZE, + pagesFetched = 0, + numErrors = 0, + totalRecordsFetched = 0; + //GET https://api.reso.org/Property let requestUri = initialRequestUri, lastRequestUri = null, @@ -82,7 +83,7 @@ async function* replicationIterator(config = {}) { requestUri = buildRequestUri({ requestUri, strategy, - recordsFetched, + totalRecordsFetched, pageSize, lastIsoTimestamp, nextLink @@ -94,15 +95,22 @@ async function* replicationIterator(config = {}) { } let responseTimeMs = 0, - startTime; + startTime, stopTime; try { //request records console.log(`Fetching records from '${requestUri}'...`); - startTime = Date.now(); + + const startTimeMs = new Date(); const response = await fetch(requestUri, { headers }); - responseTimeMs = Date.now() - startTime; + const stopTimeMs = new Date(); + + startTime = startTimeMs.toISOString(); + stopTime = stopTimeMs.toISOString(); + + //TODO: legacy property - deprecate + responseTimeMs = stopTimeMs - startTimeMs; //set state lastRequestUri = requestUri; @@ -113,28 +121,28 @@ async function* replicationIterator(config = {}) { if (response.ok) { pageSize = responseJson[ODATA_VALUE_PROPERTY_NAME]?.length ?? 0; nextLink = responseJson[ODATA_NEXT_LINK_PROPERTY_NAME] ?? null; - recordsFetched += pageSize; + totalRecordsFetched += pageSize; if (pageSize) { console.log( `Request succeeded! Time taken: ${responseTimeMs}ms. Records fetched: ${pageSize}. ` + - `Total records fetched: ${recordsFetched}\n` + `Total records fetched: ${totalRecordsFetched}\n` ); } else { console.log('No records to fetch!'); } - //if the response was OK, the request was successful even if no records - successfulRequestCount++; + pagesFetched++; } else { //TODO: when there's an unsuccessful request, sometimes the error message is in the response body - console.error(`${JSON.stringify(responseJson)}\n`); - errorRequestCount++; + console.error(response); + numErrors++; error = response?.statusText ?? null; + stopTime = new Date().toISOString; } } catch (err) { - console.error(`${JSON.stringify(err)}\n`); - errorRequestCount++; + console.error(err); + numErrors++; error = err; } @@ -142,15 +150,17 @@ async function* replicationIterator(config = {}) { requestUri, responseStatus, responseTimeMs, + startTime, + stopTime, response: responseJson, hasResults: pageSize > 0, pageSize, - recordsFetched, - successfulRequestCount, - errorRequestCount, + totalRecordsFetched, + pagesFetched, + numErrors, error }; - } while (pageSize > 0 && recordsFetched < MAX_RECORD_COUNT_DEFAULT && errorRequestCount < maxErrorCount); + } while (pageSize > 0 && totalRecordsFetched < MAX_RECORD_COUNT_DEFAULT && numErrors < maxErrorCount); } /** diff --git a/lib/replication/utils.js b/lib/replication/utils.js index 256c712..a6854ee 100644 --- a/lib/replication/utils.js +++ b/lib/replication/utils.js @@ -2,96 +2,145 @@ const { writeFile } = require('fs/promises'); -/* -{ - "description": "RESO Data Availability Report", - "version": "1.7", - "generatedOn": "2023-09-11T17:37:06.066Z", - "resources": [ - { - "resourceName": "Office", - "recordCount": 1751, - "numRecordsFetched": 1709, - "numSamples": 18, - "pageSize": 100, - "averageResponseBytes": 106953, - "averageResponseTimeMs": 547, - "dateField": "ModificationTimestamp", - "dateLow": "2019-08-14T13:59:06Z", - "dateHigh": "2023-08-28T13:46:24Z", - "keyFields": [ - "OfficeKey" //TODO: key fields needs to be in the metadata report instead - ] - }, - "fields": [ - - ], - "lookups": [], - "lookupValues": [] -} +const scorePayload = ({ + requestUri = '', + records = [], + resourceAvailabilityMap, + resourceName = '', + isExpansion = false, + expansions = [], + startTime, + stopTime +}) => { + // init if the resource doesn't exist + if (!resourceAvailabilityMap?.[resourceName]) { + resourceAvailabilityMap[resourceName] = { + resourceName, + recordCount: 0, + pageSize: records?.length ?? 0, + requests: [], + dateLow: null, + dateHigh: null, + isExpansion: false, + parentResourceName: null, + // TODO: allow passing the date field name + dateField: 'ModificationTimestamp', + // field availability map is fieldName and frequency + fieldAvailabilityMap: {} + }; + + // TODO: need to deal with average response time ms and record count fields + resourceAvailabilityMap[resourceName].requests.push({ requestUri, startTime, stopTime, recordCount: records?.length ?? 0 }); + } -*/ -const scorePayload = (records = [], resourceAvailabilityMap = {}, resourceName, expansions = []) => { records.forEach(record => { Object.entries(record).forEach(([fieldName, value]) => { - if (!resourceAvailabilityMap?.[resourceName]) { - resourceAvailabilityMap[resourceName] = { + // init if the field if it doesn't exist + if (!resourceAvailabilityMap?.[resourceName]?.fieldAvailabilityMap?.[fieldName]) { + resourceAvailabilityMap[resourceName].fieldAvailabilityMap[fieldName] = { resourceName, - recordCount: 0, - numRecordsFetched: 0, - numSamples: 0, - pageSize: 0, - requests: [], - dateLow: null, - dateHigh: null, - isExpansion: false, - parentResourceName: null, - dateField: 'ModificationTimestamp', - fieldAvailabilityMap: {} - }; - } - - if (!resourceAvailabilityMap?.[resourceName]?.[fieldName]) { - resourceAvailabilityMap[resourceName][fieldName] = { fieldName, frequency: 0 }; } + // if there's a value, it can either be a primitive, or array/object if (!!value && (typeof value !== 'object' || Object.values(value)?.length)) { - resourceAvailabilityMap[resourceName][fieldName]++; + // increment usage + resourceAvailabilityMap[resourceName].fieldAvailabilityMap[fieldName].frequency++; + + // Update resource max and min dates + if (fieldName === resourceAvailabilityMap?.[resourceName]?.dateField) { + const dateValue = new Date(value), + currentDateLowValue = resourceAvailabilityMap[resourceName].dateLow, + currentDateHighValue = resourceAvailabilityMap[resourceName].dateHigh; + + if (!!currentDateLowValue) { + resourceAvailabilityMap[resourceName].dateLow = new Date(Math.min(new Date(currentDateLowValue), dateValue)).toISOString(); + } + + if (!!currentDateHighValue) { + resourceAvailabilityMap[resourceName].dateLow = new Date(Math.max(new Date(currentDateHighValue), dateValue)).toISOString(); + } + } - //TODO: expansions needs to be the resource name and expanded field name + // process expansions, if present if (expansions?.includes[fieldName]) { - scorePayload(value, resourceAvailabilityMap, fieldName, expansions); + // TODO: look up the resource name for the expanded field and determine whether it's a collection or not + // for now, just use Media + const resourceName = 'Media', + isCollection = true; + + // TODO: implement without recursion + resourceAvailabilityMap = scorePayload({ + requestUri, + records: value, + resourceAvailabilityMap, + parentResourceName: resourceName, + resourceName, + isExpansion: true, + isCollection, + expansions + }); } } + + if (isExpansion) { + //resourceAvailabilityMap[resourceName][fieldName] + console.log('Is Expansion! ' + resourceName + ', ' + fieldName); + } }); }); - - return resourceAvailabilityMap; }; -const consolidateResults = (resourceAvailabilityMap = {}) => { - Object.entries(resourceAvailabilityMap ?? {}).reduce((acc, [resourceName, fieldMap]) => { - - }, { - resources: [], - fields: [], - lookups: [] - }); +const consolidateResults = (resourceAvailabilityMap = {}) => + Object.values(resourceAvailabilityMap ?? {}).reduce( + (acc, resourceData) => { + const { fieldAvailabilityMap = {}, ...remainingResourceData } = resourceData; - return { - resources: [{ resourceName, numRecordsFetched }], - fields: Object.entries(resourceAvailabilityMap ?? {}).map(([resourceName, fieldMap]) => { - Object.entries(fieldMap ?? {}).map(([fieldName, frequency]) => { + if (Object.values(remainingResourceData)) { + acc.resources.push(remainingResourceData); + acc.fields.push(Object.values(fieldAvailabilityMap)); + } - }); - }) - }; -}; + return acc; + }, + { + resources: [], + fields: [], + lookups: [] + } + ); -const writeDataAvailabilityReport = async (resourceAvailabilityMap = {}) => { +/* + { + "description": "RESO Data Availability Report", + "version": "1.7", + "generatedOn": "2023-09-11T17:37:06.066Z", + "resources": [ + { + "resourceName": "Office", + "recordCount": 1751, + "numRecordsFetched": 1709, + "numSamples": 18, + "pageSize": 100, + "averageResponseBytes": 106953, + "averageResponseTimeMs": 547, + "dateField": "ModificationTimestamp", + "dateLow": "2019-08-14T13:59:06Z", + "dateHigh": "2023-08-28T13:46:24Z", + "keyFields": [ + "OfficeKey" //TODO: key fields needs to be in the metadata report instead + ] + }, + "fields": [ + + ], + "lookups": [], + "lookupValues": [] + } +*/ +const writeDataAvailabilityReport = async ({ version, resourceAvailabilityMap = {} }) => { const AVAILABILITY_REPORT_FILENAME = 'data-availability-report.json'; try { @@ -100,7 +149,7 @@ const writeDataAvailabilityReport = async (resourceAvailabilityMap = {}) => { JSON.stringify( { description: 'RESO Data Availability Report', - version: '1.7', + version, generatedOn: new Date().toISOString(), ...consolidateResults(resourceAvailabilityMap) },