Skip to content

Commit

Permalink
#54: Added request generator
Browse files Browse the repository at this point in the history
  • Loading branch information
darnjo committed Oct 5, 2023
1 parent 6a65abf commit d63c478
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 75 deletions.
12 changes: 7 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ if (require?.main === module) {
program
.command('replicate')
.description('Replicates data from a given resource with expansions.')
.requiredOption('-s, --strategy <string>', 'One of TopAndSkip, ModificationTimestampAsc, ModificationTimestampDesc, or NextLink')
.option('-u, --url <string>', 'The URL to start replicating from')
.requiredOption('-s, --strategy <string>', 'One of TopAndSkip, TimestampAsc, TimestampDesc, or NextLink')
.option('-u, --serviceRootUri <string>', 'OData service root URI (no resource name or query)')
.option('-b, --bearerToken <string>', 'Bearer token to use for authorization')
.option('-p, --pathToConfigFile', 'Path to config containing credentials')
.option('-m, --pathToMetadataReportJson <string>', 'Path to metadata report JSON')
.option('-r, --resourceName <string>', 'Resource name to replicate data from')
.option('-x, --expansions <items>', 'Comma-separated list of items to expand during the query process, e.g. Media,OpenHouse')
.option('-m, --metadataReportPath <string>', 'Path to metadata report to use for replication')
.option('-f, --filter <string>', 'OData $filter expression')
.option('-t, --top <number>', 'Optional parameter to use for OData $top')
.option('-s, --maxPageSize <number>', 'Optional parameter for the odata.maxpagesize header')
.option('-o, --outputPath <string>', 'Name of directory for results')
.option('-l, --limit <number>', 'Limit for total number of records')
.option('-l, --limit <number>', 'Limit total number of records at client level')
.action(replicate);

program
Expand Down
155 changes: 95 additions & 60 deletions lib/replication/index.js
Original file line number Diff line number Diff line change
@@ -1,94 +1,129 @@
'use strict';

const { replicationIterator, REPLICATION_STRATEGIES } = require('./replication-iterator');
const { DEFAULT_DD_VERSION, NOT_OK, handleError, scorePayload, writeDataAvailabilityReport, displayRuntimeInfo } = require('./utils');
const { writeFile, mkdir } = require('fs/promises');
const { parseResourceNameFromODataRequestUri, buildOutputFilePath } = require('./utils');
const { join } = require('path');

const {
DEFAULT_DD_VERSION,
handleError,
scorePayload,
writeDataAvailabilityReport,
displayRuntimeInfo,
prepareRequests,
buildOutputFilePath
} = require('./utils');

/**
* Replicates data from the given OData request URL using the given strategy, credentials, and options
*
* @param {Object} args this function takes multiple parameters
* @returns this function has no return value, but will produce side effects if outputPath is used (will write files)
*/
const replicate = async ({ url: initialRequestUri, strategy, bearerToken, outputPath, limit, expansions = ['Media' /* TODO */] }) => {
const replicate = async ({
serviceRootUri,
strategy,
bearerToken,
clientCredentials = {},
outputPath,
limit,
resourceName,
expansions,
metadataReportJson = {},
pathToMetadataReportJson = '',
filter,
top,
orderby
}) => {
if (!Object.values(REPLICATION_STRATEGIES).includes(strategy)) {
throw new Error(`Unknown strategy: '${strategy}'!`);
}

const resourceName = parseResourceNameFromODataRequestUri(initialRequestUri),
shouldSaveResults = !!outputPath,
resultsPath = shouldSaveResults ? buildOutputFilePath(outputPath, resourceName) : null;
// expansions will be a comma-separated list for now, if passed
const expansionsArray = expansions?.split(',').map(x => x?.trim()) || [];

const requests = await prepareRequests({
serviceRootUri,
metadataReportJson,
pathToMetadataReportJson,
resourceName,
expansions: expansionsArray,
filter,
top,
orderby
});

const startTime = Date.now(),
shouldSaveResults = !!outputPath,
resourceAvailabilityMap = {};

try {
for await (const {
hasResults = false,
hasError = false,
responseTimeMs = 0,
response = {},
error = {},
startTime = 0,
stopTime = 0,
totalRecordsFetched = 0,
pagesFetched = 0,
requestUri,
responseBytes = 0
} of replicationIterator({ initialRequestUri, strategy, authInfo: { bearerToken }, limit })) {
try {
//handle errors
if (hasError) {
handleError(error);
}
for await (const request of requests) {
const { requestUri: initialRequestUri } = request;

//process results
if (hasResults) {
//TODO: call schema validator if flag is set
try {
for await (const {
hasResults = false,
hasError = false,
responseTimeMs = 0,
response = {},
error = {},
startTime = 0,
stopTime = 0,
totalRecordsFetched = 0,
pagesFetched = 0,
responseBytes = 0
} of replicationIterator({ initialRequestUri, strategy, authInfo: { bearerToken, clientCredentials }, limit })) {
try {
//handle errors
if (hasError) {
handleError(error);
}

scorePayload({
requestUri,
records: response.value,
resourceAvailabilityMap, // mutated on each call
resourceName,
expansions,
responseTimeMs,
startTime,
stopTime,
responseBytes
});
//process results
if (hasResults) {
//TODO: call schema validator if flag is set

if (shouldSaveResults) {
await mkdir(resultsPath, { recursive: true });
await writeFile(join(resultsPath, `page-${pagesFetched}.json`), JSON.stringify(response));
scorePayload({
...request,
records: response.value,
resourceAvailabilityMap, // mutated on each call
responseTimeMs,
startTime,
stopTime,
responseBytes
});

if (shouldSaveResults) {
const resultsPath = buildOutputFilePath(outputPath, request?.resourceName);
await mkdir(resultsPath, { recursive: true });
await writeFile(join(resultsPath, `page-${pagesFetched}.json`), JSON.stringify(response));
}
}
}

if (!!limit && totalRecordsFetched >= limit) {
break;
if (!!limit && totalRecordsFetched >= limit) {
break;
}
} catch (err) {
//TODO: add logic to allow fast fail, in which case we'd return here
console.error(err);
return;
}
} catch (err) {
//TODO: add logic to allow fast fail, in which case we'd return here
console.error(err);
return NOT_OK;
}
}
} catch (err) {
//TODO: add logic to allow fast fail, in which case we'd return here
console.error(err);
return NOT_OK;
} finally {

try {
await writeDataAvailabilityReport({ version: DEFAULT_DD_VERSION, resourceAvailabilityMap });
} catch (err) {
console.error(`Could not write data availability report! ${err}`);
//TODO: add logic to allow fast fail, in which case we'd return here
console.error(err);
return;
}
}

displayRuntimeInfo({ version: DEFAULT_DD_VERSION, startTime, resourceAvailabilityMap });
try {
await writeDataAvailabilityReport({ version: DEFAULT_DD_VERSION, resourceAvailabilityMap });
} catch (err) {
console.error(`Could not write data availability report! ${err}`);
}

displayRuntimeInfo({ version: DEFAULT_DD_VERSION, startTime, resourceAvailabilityMap });

return;
};

Expand Down
Loading

0 comments on commit d63c478

Please sign in to comment.