Skip to content

Commit

Permalink
#54: Initial checkin of basic TopAndSkip replication
Browse files Browse the repository at this point in the history
  • Loading branch information
darnjo committed Sep 26, 2023
1 parent 36327a1 commit 99256fa
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,5 @@ dist
.tern-port

.DS_Store
.vscode
output
14 changes: 14 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
const { restore } = require('./lib/restore-utils');
const { runTests } = require('./lib/batch-test-runner');
const { findVariations, computeVariations } = require('./lib/find-variations/index.js');
const { replicate } = require('./lib/replication/index.js');

if (require?.main === module) {
const { program } = require('commander');
Expand Down Expand Up @@ -33,10 +34,23 @@ if (require?.main === module) {
.description('Finds possible variations in metadata using a number of methods.')
.action(findVariations);

program
.command('replicate')
.requiredOption('-s, --strategy <string>', 'One of TopAndSkip, ModificationTimestampAsc, ModificationTimestampDesc, or NextLink')
.option('-u, --url <string>', 'The URL to start replicating from')
.option('-b, --bearerToken <string>', 'Bearer token to use for authorization')
.option('-p, --pathToConfigFile', 'Path to config containing credentials')
.option('-r, --resourceName <string>', 'Resource name to replicate data from')
.option('-x, --expansions <string>', 'Items to expand during the query process, e.g. Media')
.option('-m, --metadataReportPath', 'Path to metadata report to use for replication')
.description('Replicates data from a given resource with expansions.')
.action(replicate);

program.parse();
}

module.exports = {
replicate,
restore,
runTests,
findVariations,
Expand Down
29 changes: 29 additions & 0 deletions lib/replication/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# RESO Replication Client
The RESO Replication Client can be used to fetch data from a given URL using a number of different replication strategies and supports OAuth 2 bearer tokens and client credentials.

## View Help
Use the following command to view help info:

```
$ reso-certification-utils replicate --help
Usage: reso-certification-utils replicate [options]
Replicates data from a given resource with expansions.
Options:
-s, --strategy <string> One of TopAndSkip, ModificationTimestampAsc, ModificationTimestampDesc, or NextLink
-u, --url <string> The URL to start replicating from
-b, --bearerToken <string> Bearer token to use for authorization
-p, --pathToConfigFile Path to config containing credentials
-r, --resourceName <string> Resource name to replicate data from
-x, --expansions <string> Items to expand during the query process, e.g. Media
-m, --metadataReportPath Path to metadata report to use for replication
-h, --help display help for command
```

## Example: Replicate Data from a URL Using `TopAndSkip`
Replicating from `https://some.api.com/Property` can be done as follows:
```
$ reso-certification-utils replicate -s TopAndSkip -u https://some.api.com/Property -b <your test token>
```
34 changes: 34 additions & 0 deletions lib/replication/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
'use strict';

const { replicationIterator, REPLICATION_STRATEGIES } = require('./replication-iterator');
const humanizeDuration = require('humanize-duration');

const replicate = async ({ url: requestUri, strategy, bearerToken, expansions }) => {
if (!Object.values(REPLICATION_STRATEGIES).includes(strategy)) {
throw new Error(`Unknown strategy: '${strategy}'!`);
}

const config = {
requestUri,
strategy: strategy,
authInfo: {
bearerToken
},
expansions
};

const startTime = Date.now();

for await (const data of replicationIterator(config)) {
if (data?.hasResults) {
console.log('Data fetched!');
}
}

console.log(`\nReplication completed in ${humanizeDuration(Date.now() - startTime)}!`);
return;
};

module.exports = {
replicate
};
146 changes: 146 additions & 0 deletions lib/replication/replication-iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
'use strict';

const queryString = require('node:querystring');

const MAX_RECORD_COUNT_DEFAULT = 100000,
DEFAULT_PAGE_SIZE = 1000,
ODATA_VALUE_PROPERTY_NAME = 'value';

const REPLICATION_STRATEGIES = Object.freeze({
TOP_AND_SKIP: 'TopAndSkip',
TIMESTAMP_ASC: 'TimestampAsc',
TIMESTAMP_DESC: 'TimestampDesc',
NEXT_LINK: 'NextLink'
});

const getBearerTokenAuthHeader = (token = '') => (token?.length ? { Authorization: `Bearer ${token}` } : {});

const buildRequestUri = ({ requestUri, strategy, currentRecordCount = 0, lastPageCount, /* lastIsoTimestamp, nextLink */ }) => {
const [baseUri = null, query = null] = requestUri.split('?');

const queryParams = query !== null ? queryString.parse(query) : {};

if (strategy === REPLICATION_STRATEGIES.TOP_AND_SKIP) {
const { $top: top = lastPageCount ?? DEFAULT_PAGE_SIZE, ...remainingParams } = queryParams;

//$skip param from queryParams is always ignored
delete remainingParams.$skip;
const remainingQueryString = queryString.stringify(remainingParams) ?? '';

return `${baseUri}?$top=${top}&$skip=${currentRecordCount}${remainingQueryString?.length ? `&${remainingQueryString}` : ''}`;
} else if (strategy === REPLICATION_STRATEGIES.TIMESTAMP_ASC) {
throw new Error(`Unsupported replication strategy '${strategy}'!`);
} else if (strategy === REPLICATION_STRATEGIES.TIMESTAMP_DESC) {
throw new Error(`Unsupported replication strategy '${strategy}'!`);
} else if (strategy === REPLICATION_STRATEGIES.NEXT_LINK) {
throw new Error(`Unsupported replication strategy '${strategy}'!`);
} else {
throw new Error(`Unsupported replication strategy '${strategy}'!`);
}
};

async function* replicationIterator(config = {}) {
const {
requestUri: initialRequestUri = '',
maxErrorCount = 3,
authInfo = {},
strategyInfo = { strategy: 'TopAndSkip', pageSize: DEFAULT_PAGE_SIZE }
} = config;

const { bearerToken, /* clientCredentials */ } = authInfo;

const headers = {
...getBearerTokenAuthHeader(bearerToken)
};

let successfulRequestCount = 0,
errorRequestCount = 0,
currentRecordCount = 0,
lastPageCount = DEFAULT_PAGE_SIZE;

//GET https://api.reso.org/Property
let requestUri = initialRequestUri,
lastRequestUri = null;

console.log('Request uri is: ' + requestUri);

do {
let responseJson = null,
responseStatus = 0,
error = null;
//lastIsoTimestamp = null,
//nextLink = null;

requestUri = buildRequestUri({
requestUri,
strategy: strategyInfo.strategy,
currentRecordCount,
lastPageCount,
//lastIsoTimestamp,
//nextLink
});

if (requestUri === lastRequestUri) {
throw new Error(`Same URLs found for consecutive requests!\n\tRequestUri: ${requestUri}\n\tLastRequestUri: ${lastRequestUri}`);
}

let responseTimeMs = 0, startTime;
try {
console.log(`Fetching records from '${requestUri}'...`);
startTime = Date.now();
const response = await fetch(requestUri, { headers });
responseTimeMs = Date.now() - startTime;

lastRequestUri = requestUri;
responseStatus = response.status;
responseJson = await response.json();

if (response.ok) {
lastPageCount = responseJson[`${ODATA_VALUE_PROPERTY_NAME}`]?.length ?? 0;
currentRecordCount += lastPageCount;

if (lastPageCount) {
console.log(
`Request succeeded! Time taken: ${responseTimeMs} ms. Records fetched: ${lastPageCount}. ` +
`Total records fetched: ${currentRecordCount}\n`
);
} else {
console.log('No records to fetch!');
}
successfulRequestCount++;
} else {
console.error(`${JSON.stringify(responseJson)}\n`);
errorRequestCount++;
error = response?.statusText ?? null;
}
} catch (err) {
console.error(`${JSON.stringify(err)}\n`);
errorRequestCount++;
error = err;
}

yield {
requestUri,
responseStatus,
responseTimeMs,
response: responseJson,
hasResults: lastPageCount > 0,
error,
successfulRequestCount,
errorRequestCount
};
} while (lastPageCount > 0 && currentRecordCount < MAX_RECORD_COUNT_DEFAULT && errorRequestCount < maxErrorCount);
}

/**
* Replication Iterator service provides an interface
* for requesting data from servers using a number of strategies:
* * OData Next Link
* * OData Top and Skip
* * OData Order By Timestamp (Asc/Desc)
*/

module.exports = {
REPLICATION_STRATEGIES,
replicationIterator
};
1 change: 1 addition & 0 deletions lib/replication/services/auth/oauth2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
//TODO
6 changes: 6 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"dotenv": "^16.0.1",
"fastest-levenshtein": "^1.0.16",
"fs-extra": "^10.0.0",
"humanize-duration": "^3.30.0",
"mocha": "^9.1.3",
"pascal-case": "^3.1.2",
"reso-certification-etl": "github:RESOStandards/reso-certification-etl#1c2fc0bb5ae1e6f71775cd81d7ae3db46da6bf86",
Expand Down

0 comments on commit 99256fa

Please sign in to comment.