-
Notifications
You must be signed in to change notification settings - Fork 113
CUMULUS-3960: Updated PostToCmr task to be able to republish granules #3906
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
acd395b
4f790ff
e9e1831
548fbf1
e356b99
863fe9b
6050fde
c7d9b94
13fa33f
d255ece
08128fc
ef1a31e
384b3af
7de83d4
958e18a
ad40a0f
a9072ef
cf2103b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,16 @@ const { | |
|
||
const log = new Logger({ sender: '@cumulus/cmrjs/src/cmr-utils' }); | ||
|
||
/** | ||
* @typedef {{ | ||
* provider: string, | ||
* clientId: string, | ||
* username?: string, | ||
* password?: string, | ||
* token?: string | ||
* }} CmrCredentials | ||
*/ | ||
|
||
function getS3KeyOfFile(file) { | ||
if (file.filename) return parseS3Uri(file.filename).Key; | ||
if (file.filepath) return file.filepath; | ||
|
@@ -233,6 +243,17 @@ async function publish2CMR(cmrPublishObject, creds, cmrRevisionId) { | |
throw new Error(`invalid cmrPublishObject passed to publis2CMR ${JSON.stringify(cmrPublishObject)}`); | ||
} | ||
|
||
/** | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a reason why this wasn't here before? did we never have the case of having to remove from CMR until now or was there just another way to do it (via API or something) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mainly for my curiousity I guess There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. api package has _removeGranuleFromCmr which does more stuff There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so this is like a mini-function of that kinda where it only deletes, sounds good and pretty useful maybe 👍 |
||
* Remove granule from CMR. | ||
* | ||
* @param {string} granuleUR - the granuleUR | ||
* @param {CmrCredentials} creds - credentials needed to post to CMR service | ||
*/ | ||
async function removeFromCMR(granuleUR, creds) { | ||
const cmrClient = new CMR(creds); | ||
return await cmrClient.deleteGranule(granuleUR); | ||
} | ||
|
||
/** | ||
* Returns the S3 object identified by the specified S3 URI and (optional) | ||
* entity tag, retrying up to 5 times, if necessary. | ||
|
@@ -1249,6 +1270,7 @@ module.exports = { | |
publish2CMR, | ||
reconcileCMRMetadata, | ||
removeEtagsFromFileObjects, | ||
removeFromCMR, | ||
updateCMRMetadata, | ||
uploadEcho10CMRFile, | ||
uploadUMMGJSONCMRFile, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,6 @@ | |
], | ||
"statements": 94.0, | ||
"functions": 88.0, | ||
"branches": 97.0, | ||
"branches": 88.0, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only the handler function is not covered as before, not sure how to improve this. |
||
"lines": 94.0 | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,14 @@ | ||
'use strict'; | ||
|
||
const keyBy = require('lodash/keyBy'); | ||
const pMap = require('p-map'); | ||
const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js'); | ||
const { | ||
addEtagsToFileObjects, | ||
granulesToCmrFileObjects, | ||
metadataObjectFromCMRFile, | ||
publish2CMR, | ||
removeFromCMR, | ||
removeEtagsFromFileObjects, | ||
} = require('@cumulus/cmrjs'); | ||
const { getCmrSettings, getS3UrlOfFile } = require('@cumulus/cmrjs/cmr-utils'); | ||
|
@@ -85,6 +87,28 @@ function checkForMetadata(granules, cmrFiles) { | |
}); | ||
} | ||
|
||
/** | ||
* Remove granules from CMR | ||
* | ||
* @param {object} params - parameter object | ||
* @param {Array<object>} params.granules - granules to remove | ||
* @param {object} params.cmrSettings - CMR credentials | ||
* @param {number} params.concurrency - Maximum concurrency of requests to CMR | ||
* @throws {Error} - Error from CMR request | ||
*/ | ||
async function removeGranuleFromCmr({ granules, cmrSettings, concurrency }) { | ||
const granulesToUnpublish = granules.filter((granule) => granule.published || !!granule.cmrLink); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. its considered published if it doesnt have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In theory published granule should have cmrLink, but we should be able to delete the granule from cmr if we don't have the link |
||
await pMap( | ||
granulesToUnpublish, | ||
(granule) => removeFromCMR(granule.granuleId, cmrSettings), | ||
Nnaga1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ concurrency } | ||
); | ||
|
||
if (granulesToUnpublish.length > 0) { | ||
log.info(`Removing ${granulesToUnpublish.length} out of ${granules.length} granules from CMR for republishing`); | ||
} | ||
} | ||
|
||
/** | ||
* Post to CMR | ||
* | ||
|
@@ -108,7 +132,17 @@ function checkForMetadata(granules, cmrFiles) { | |
*/ | ||
async function postToCMR(event) { | ||
const { cmrRevisionId, granules } = event.input; | ||
const { etags = {} } = event.config; | ||
const { etags = {}, republish = false, concurrency = 20 } = event.config; | ||
|
||
const cmrSettings = await getCmrSettings({ | ||
...event.config.cmr, | ||
...event.config.launchpad, | ||
}); | ||
|
||
// if republish is true, unpublish granules which are public | ||
if (republish) { | ||
await removeGranuleFromCmr({ granules, cmrSettings, concurrency }); | ||
} | ||
|
||
granules.forEach((granule) => addEtagsToFileObjects(granule, etags)); | ||
|
||
|
@@ -122,18 +156,14 @@ async function postToCMR(event) { | |
|
||
const startTime = Date.now(); | ||
|
||
const cmrSettings = await getCmrSettings({ | ||
...event.config.cmr, | ||
...event.config.launchpad, | ||
}); | ||
|
||
// post all meta files to CMR | ||
const results = await Promise.all( | ||
updatedCMRFiles.map((cmrFile) => publish2CMR(cmrFile, cmrSettings, cmrRevisionId)) | ||
const results = await pMap( | ||
updatedCMRFiles, | ||
(cmrFile) => publish2CMR(cmrFile, cmrSettings, cmrRevisionId), | ||
{ concurrency } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Jonathan suggested to have the promise.allsettled behavior, which is adding There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ohhh ok, that makes sense, thanks for lmk 🙌 |
||
); | ||
|
||
const endTime = Date.now(); | ||
|
||
const outputGranules = buildOutput( | ||
results, | ||
granules | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -108,7 +108,7 @@ test.serial('postToCMR throws error if CMR correctly identifies the xml as inval | |
} | ||
}); | ||
|
||
test.serial('postToCMR fails when CMR is down', async (t) => { | ||
test.serial('postToCMR fails to publish granules when CMR is down', async (t) => { | ||
sinon.stub(cmrClient.CMR.prototype, 'getToken'); | ||
const { bucket, payload } = t.context; | ||
const newPayload = payload; | ||
|
@@ -133,6 +133,27 @@ test.serial('postToCMR fails when CMR is down', async (t) => { | |
} | ||
}); | ||
|
||
test.serial('postToCMR fails to republish granules when CMR is down', async (t) => { | ||
sinon.stub(cmrClient.CMR.prototype, 'getToken'); | ||
const newPayload = cloneDeep(t.context.payload); | ||
newPayload.config.republish = true; | ||
newPayload.config.concurrency = 2; | ||
newPayload.input.granules[0].published = true; | ||
newPayload.input.granules[0].cmrLink = randomString; | ||
|
||
sinon.stub(cmrClient.CMR.prototype, 'deleteGranule').throws(new CMRInternalError()); | ||
t.teardown(() => { | ||
cmrClient.CMR.prototype.deleteGranule.restore(); | ||
}); | ||
|
||
try { | ||
await t.throwsAsync(postToCMR(newPayload), | ||
{ instanceOf: CMRInternalError }); | ||
} finally { | ||
cmrClient.CMR.prototype.getToken.restore(); | ||
} | ||
}); | ||
|
||
test.serial('postToCMR raises correct error', async (t) => { | ||
sinon.stub(cmrClient.CMR.prototype, 'getToken'); | ||
const { bucket, payload } = t.context; | ||
|
@@ -190,6 +211,45 @@ test.serial('postToCMR succeeds with correct payload', async (t) => { | |
}); | ||
}); | ||
|
||
test.serial('postToCMR successfully republishes granules with correct payload', async (t) => { | ||
const { bucket, payload } = t.context; | ||
const newPayload = cloneDeep(payload); | ||
newPayload.config.concurrency = 2; | ||
newPayload.config.republish = true; | ||
newPayload.input.granules[0].published = true; | ||
newPayload.input.granules[0].cmrLink = randomString; | ||
|
||
const granuleId = newPayload.input.granules[0].granuleId; | ||
const cmrFileKey = `${granuleId}.cmr.xml`; | ||
|
||
sinon.stub(cmrClient.CMR.prototype, 'deleteGranule'); | ||
sinon.stub(cmrClient.CMR.prototype, 'ingestGranule').callsFake(resultThunk); | ||
t.teardown(() => { | ||
cmrClient.CMR.prototype.deleteGranule.restore(); | ||
cmrClient.CMR.prototype.ingestGranule.restore(); | ||
}); | ||
|
||
await s3PutObject({ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get the other calls/functions but what does this s3PutObject do in the case of the test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. postToCMR function reads the cmr metadata file and ingest to CMR. |
||
Bucket: bucket, | ||
Key: cmrFileKey, | ||
Body: fs.createReadStream(path.join(path.dirname(__filename), 'data', 'meta.xml')), | ||
}); | ||
|
||
await validateInput(t, newPayload.input); | ||
await validateConfig(t, newPayload.config); | ||
const output = await postToCMR(newPayload); | ||
await validateOutput(t, output); | ||
t.is(output.granules.length, 1); | ||
t.is( | ||
output.granules[0].cmrLink, | ||
`https://cmr.uat.earthdata.nasa.gov/search/concepts/${result['concept-id']}.echo10` | ||
); | ||
output.granules.forEach((g) => { | ||
t.true(Number.isInteger(g.post_to_cmr_duration)); | ||
t.true(g.post_to_cmr_duration >= 0); | ||
}); | ||
}); | ||
|
||
test.serial('postToCMR immediately succeeds using metadata file ETag', async (t) => { | ||
const newPayload = cloneDeep(t.context.payload); | ||
const granuleId = newPayload.input.granules[0].granuleId; | ||
|
Uh oh!
There was an error while loading. Please reload this page.