diff --git a/package.json b/package.json index c799f29..2fe25e3 100644 --- a/package.json +++ b/package.json @@ -119,4 +119,4 @@ "resolutions": { "@oada/jobs": "portal:/home/sanoel/jobs" } -} \ No newline at end of file +} diff --git a/src/config.ts b/src/config.ts index 076fcb2..7ecdecf 100644 --- a/src/config.ts +++ b/src/config.ts @@ -59,6 +59,14 @@ const config = convict({ }, }, timeouts: { + simx: { + doc: 'Timeout duration for Target themselves', + format: 'duration', + // The types for duration suck + default: 3_600_000 as unknown as number, + env: 'SIMX_TIMEOUT', + arg: 'simx-timeout', + }, pdf: { doc: 'Timeout duration for PDF jobs', format: 'duration', diff --git a/src/index.ts b/src/index.ts index 4fdba9a..f09ee7b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -76,8 +76,8 @@ await Promise.all( }); // 1 concurrent job // -------------------------------------------------- - // Set the job type handlers - service.on('transcription', config.get('timeouts.pdf'), pdfJobHandler); + // Set the job type handlers; timeout the job 2 minutes longer than target + service.on('transcription', config.get('timeouts.pdf') + 120_000, pdfJobHandler); // Service.on('asn', config.get('timeouts.asn'), asnJobHandler); // -------------------------------------------------- diff --git a/src/pdfJob.ts b/src/pdfJob.ts index 1197163..0eca26a 100644 --- a/src/pdfJob.ts +++ b/src/pdfJob.ts @@ -28,10 +28,9 @@ import moment from 'moment'; import oError from '@overleaf/o-error'; import type { Change, JsonObject, OADAClient } from '@oada/client'; -import type { Json, Logger, WorkerFunction } from '@oada/jobs'; -import type { JWK } from '@trellisfw/signatures'; -import type Job from '@oada/types/oada/service/job.js'; -import type Jobs from '@oada/types/oada/service/jobs.js'; +import type { Job, Json, Logger, WorkerFunction } from '@oada/jobs'; import type { JWK } from '@trellisfw/signatures'; +//import type Job from '@oada/types/oada/service/job.js'; +//import type Jobs from '@oada/types/oada/service/jobs.js'; import type { Link } from '@oada/types/oada/link/v1.js'; import { AssumeState, ChangeType, ListWatch } from '@oada/list-lib'; import { Gauge } from '@oada/lib-prom'; @@ -55,6 +54,7 @@ const trace = debug('target-helper:trace'); const pending = '/bookmarks/services/target/jobs/pending'; +const targetTimeout = config.get('timeouts.simx'); const jobs = new Gauge({ name: 'target_helper_jobs', help: 'Number of jobs in the pending queue', @@ -209,572 +209,565 @@ export const jobHandler: WorkerFunction = async (job, { jobId, log, oada }) => { }; } - return new Promise(async (resolve, reject) => { - try { - // - once it sees "success" in the updates, - // it will post a job to trellis-signer and notify oada-jobs of success - const targetSuccess = async () => { - log.info( - 'helper-started', - 'Target returned success, target-helper picking up', - ); - - // Get the latest copy of job - const r = await oada.get({ path: `/${jobId}` }); - assertJob(r.data); // TODO: This is already done in the jobs library? - // FIXME: Make a proper type and assert - // Note the types *should* be okay at runtime because these are needed to get to a target success - const job = r.data as typeof r.data & { - targetResult: List>; - config: { - 'pdf': Link; - 'oada-doc-type': string; - 'document': Resource; - 'docKey': string; - }; - result: List>; - }; - - // FIXME: Fail job if not exist? - const pdfID = job.config.pdf._id; - - // ------------- 0: Construct the result from the targetResult - // If/when a result that matches the input partial json is found, - // merge the resource created by target into the partial json. - // Do this all first and store it as the result for the remaining steps - // to use. - job.result = {}; - - for await (const [documentType, data] of Object.entries( - job.targetResult, - )) { - info('Document identified as %s', documentType); - - job.result[documentType] = {}; - - for await (const documentData of Object.values(data)) { - // If the document type mismatches, move the link to the right place and let the flow start over. - // Ex: LaserFiche "unidentified" documents, FoodLogiq wrong PDF uploaded, etc. - // Note: One day we might officially separate "identification" from "transcription". This is almost that. - // TODO: Reconsider this as it doesn't really work well with jobs created outside of the helper watches - if ( - // Added the 'unidentified' check to stop doing this flow for misidentified - // documents coming through e.g. fl-sync. Theres just too much cross over of - // one doc type used to satisfy another doc type. e.g. we may ask a product label to be extract it - // and target may recognize it as nutrition information because it contains both. This condition - // lets us continue doing this for jobs specifically coming in as unidentified - job.config['oada-doc-type'] === 'unidentified' && - job.config['oada-doc-type'] !== documentType && - !matchesAlternateUrlNames( - job.config['oada-doc-type'], - documentType, - ) - ) { - info( - 'Document type mismatch. Trellis: [%s], Target: [%s]. Moving tree location and bailing.', - documentType, - job.config['oada-doc-type'], - ); + return handleJob({ jobId, log, oada }); +} - trace(`Removing from ${job.config['oada-doc-type']} list.`); - await oada.delete({ - path: join( - '/bookmarks/trellisfw/documents', - job.config['oada-doc-type'], - job.config.docKey, - ), - }); +async function targetSuccess({ + jobId, + log, + oada, +} : { + jobId: string, + log: Logger, + oada: OADAClient, +}): Promise { + log.info( + 'helper-started', + 'Target returned success, target-helper picking up', + ); - const newType = fromOadaType(documentType)?.type; + // Get the latest copy of job + const r = await oada.get({ path: `/${jobId}` }); + assertJob(r.data); // TODO: This is already done in the jobs library? + // FIXME: Make a proper type and assert + // Note the types *should* be okay at runtime because these are needed to get to a target success + const job = r.data as typeof r.data & { + targetResult: List>; + config: { + 'pdf': Link; + 'oada-doc-type': string; + 'document': Resource; + 'docKey': string; + }; + result: List>; + }; - trace(`Updating resource type to ${newType}`); - await oada.put({ - path: job.config.document._id, - data: { _type: newType }, - }); - await oada.put({ - path: join(job.config.document._id, '_meta'), - data: { _type: newType }, - }); + // FIXME: Fail job if not exist? + const pdfID = job.config.pdf._id; + + // ------------- 0: Construct the result from the targetResult + // If/when a result that matches the input partial json is found, + // merge the resource created by target into the partial json. + // Do this all first and store it as the result for the remaining steps + // to use. + job.result = {}; + + for await (const [documentType, data] of Object.entries( + job.targetResult, + )) { + info('Document identified as %s', documentType); + + job.result[documentType] = {}; + + for await (const documentData of Object.values(data)) { + // If the document type mismatches, move the link to the right place and let the flow start over. + // Ex: LaserFiche "unidentified" documents, FoodLogiq wrong PDF uploaded, etc. + // Note: One day we might officially separate "identification" from "transcription". This is almost that. + // TODO: Reconsider this as it doesn't really work well with jobs created outside of the helper watches + if ( + // Added the 'unidentified' check to stop doing this flow for misidentified + // documents coming through e.g. fl-sync. Theres just too much cross over of + // one doc type used to satisfy another doc type. e.g. we may ask a product label to be extract it + // and target may recognize it as nutrition information because it contains both. This condition + // lets us continue doing this for jobs specifically coming in as unidentified + job.config['oada-doc-type'] === 'unidentified' && + job.config['oada-doc-type'] !== documentType && + !matchesAlternateUrlNames( + job.config['oada-doc-type'], + documentType, + ) + ) { + info( + 'Document type mismatch. Trellis: [%s], Target: [%s]. Moving tree location and bailing.', + documentType, + job.config['oada-doc-type'], + ); - trace(`Putting document into ${documentType} list.`); - await oada.put({ - tree, - path: join('/bookmarks/trellisfw/documents/', documentType), - data: { - [job.config.docKey]: { _id: job.config.document._id, rev: 0 }, - }, - }); + trace(`Removing from ${job.config['oada-doc-type']} list.`); + await oada.delete({ + path: join( + '/bookmarks/trellisfw/documents', + job.config['oada-doc-type'], + job.config.docKey, + ), + }); - void log.info( - 'done', - 'Document moved for proper doctype for re-processing.', - ); - resolve(job.result as Json); - return; - } + const newType = fromOadaType(documentType)?.type; - trace( - 'Merging from %s to %s.', - documentData._id, - job.config.document._id, - ); - const { data } = await oada.get({ path: documentData._id }); - await oada.put({ - path: job.config.document._id, - data: stripResource(data as JsonObject), - }); - - job.result[documentType] = { - ...job.result[documentType], - // Write the target data to the original document - [job.config.docKey]: { _id: job.config.document._id }, - }; - } - } + trace(`Updating resource type to ${newType}`); + await oada.put({ + path: job.config.document._id, + data: { _type: newType }, + }); + await oada.put({ + path: join(job.config.document._id, '_meta'), + data: { _type: newType }, + }); - // Record the result in the job + trace(`Putting document into ${documentType} list.`); await oada.put({ - path: `/${jobId}`, + tree, + path: join('/bookmarks/trellisfw/documents/', documentType), data: { - result: job.result as Json, + [job.config.docKey]: { _id: job.config.document._id, rev: 0 }, }, }); + void log.info( - 'helper: stored result after processing targetResult', - {}, + 'done', + 'Document moved for proper doctype for re-processing.', ); + jobs.dec(); + return job.result as Json; + } - /* Target-helper not /expects/ this and does not need to re-create it (wrongly) + trace( + 'Merging from %s to %s.', + documentData._id, + job.config.document._id, + ); + const { data } = await oada.get({ path: documentData._id }); + await oada.put({ + path: job.config.document._id, + data: stripResource(data as JsonObject), + }); - // ------------- 1: Look through all the things in result recursively, - // if any are links then go there and set _meta/vdoc/pdf - // Also, write a reference to the target-job that did it. This serves to - // tell our ListWatch not to queue a target job when we add the - // resource to /shared/trellisfw/documents/ + job.result[documentType] = { + ...job.result[documentType], + // Write the target data to the original document + [job.config.docKey]: { _id: job.config.document._id }, + }; + } + } - /* The _meta/vdoc/pdf/[hash] already exists under the new target-helper flow - void log.info( - 'helper: linking _meta/vdoc/pdf for each link in result', - {} - ); + // Record the result in the job + await oada.put({ + path: `/${jobId}`, + data: { + result: job.result as Json, + }, + }); + void log.info( + 'helper: stored result after processing targetResult', + {}, + ); - let pdfKey = (pdfID || '').replace(/^resources\//, ''); - async function recursivePutVdocAtLinks(object: unknown): Promise { - if (typeof object !== 'object' || !object) { - return; - } + // ------------- 2: sign audit/coi/etc. + void log.info('helper: signing all links in result', {}); + async function recursiveSignLinks(object: unknown): Promise { + if (typeof object !== 'object' || !object) { + return; + } - if (has(object, '_id')) { - await oada.put({ - path: `/${object._id}/_meta`, - data: { - vdoc: { - pdf: { - [pdfKey]: { _id: `${pdfID}` }, - }, - }, - }, - }); - await oada.put({ - path: `/${object._id}/_meta`, - data: { - services: { - target: { - jobs: { - [jobId]: { _ref: `resources/${jobId}` }, - }, - }, - }, - }, - }); - return; - } + if (has(object, '_id')) { + await signResourceForTarget({ + _id: object._id as string, + // Hack because jobs is on ancient client version + oada: oada as unknown as OADAClient, + log, + }); + return; + } - for await (const value of Object.values(object)) { - await recursivePutVdocAtLinks(value); - } + for await (const value of Object.values(object)) { + await recursiveSignLinks(value); + } + } + + await recursiveSignLinks(job.result); + + // ------------- 3: put audits/cois/etc. to proper home + const versionedResult = recursiveMakeAllLinksVersioned( + job.result, + ) as Record>; + trace(versionedResult, 'all versioned links to bookmarks'); + + // ------------- 4: cross link vdoc for pdf <-> audits,cois,letters,etc. + void log.info( + 'link-refs-pdf', + 'helper: linking result _refs under /_meta/vdoc', + ); + + const vdoc = recursiveReplaceLinksWithReferences(job.result); + info( + "Linking _ref's into pdf/_meta, job.result before: %O, after: %O", + job.result, + vdoc, + ); + await oada.put({ + path: `/${pdfID}/_meta`, + data: { + // Fsqa-audits { ...links... }, or fsqa-certificates { ...links... }, etc. + vdoc: vdoc as Json, + }, + }); + + // HARDCODED UNTIL AINZ IS UPDATED: + // ------------- 6: lookup shares, post job to shares service + // Only share for smithfield + // why 'trading-partner' here with result + const result = job['trading-partner'] ? {} : job.result ?? {}; + + for await (const doctype of Object.keys(result)) { + const shares: Shares = []; + // Results are lists of links? + const doclist = (job.result?.[doctype] ?? {}) as Record< + string, + { _id: string } + >; + for await (const [dockey, document] of Object.entries(doclist)) { + trace( + 'Fetching lookups for doctype = %s, doc = %O, getting /%s/_meta/lookups', + doctype, + document, + document._id, + ); + const { data: lookups } = (await oada.get({ + path: `/${document._id}/_meta/lookups`, + })) as unknown as { + data: Record>; + }; + trace(lookups, 'lookups'); + let facilityID; + switch (doctype) { + case 'fsqa-audits': { + facilityID = lookups['fsqa-audit']!.organization!._ref; + await pushSharesForFacility({ + facilityid: facilityID, + doc: document, + dockey, + shares, + }); + break; } - await recursivePutVdocAtLinks(job.result); - */ + case 'fsqa-certificates': { + facilityID = lookups['fsqa-certificate']!.organization!._ref; + await pushSharesForFacility({ + facilityid: facilityID, + doc: document, + dockey, + shares, + }); + break; + } - // ------------- 2: sign audit/coi/etc. - void log.info('helper: signing all links in result', {}); - async function recursiveSignLinks(object: unknown): Promise { - if (typeof object !== 'object' || !object) { - return; - } + case 'cois': { + const { data: holder } = (await oada.get({ + path: `/${lookups.coi!.holder!._ref}`, + })) as unknown as { + data: { 'trading-partners': Record }; + }; + trace({ holder }, 'Retrieved holder'); + for await (const tpLink of Object.values( + holder['trading-partners'], + )) { + const { data: tp } = await oada.get({ + path: `/${tpLink._id}`, + }); + if (!has(tp, '_id')) { + throw new Error( + `Expected _id on trading partner ${tpLink._id}`, + ); + } - if (has(object, '_id')) { - await signResourceForTarget({ - _id: object._id as string, - // Hack because jobs is on ancient client version - oada: oada as unknown as OADAClient, - log, + shares.push({ + tp: { _id: `${tp._id}` }, + doc: document, + dockey, }); - return; } - for await (const value of Object.values(object)) { - await recursiveSignLinks(value); + break; + } + + case 'letters-of-guarantee': { + const { data: buyer } = (await oada.get({ + path: `/${lookups['letter-of-guarantee']!.buyer!._ref}`, + })) as unknown as { + data: { 'trading-partners': Record }; + }; + trace('Retrieved buyer %O', buyer); + for await (const tpLink of Object.values( + buyer['trading-partners'], + )) { + const { data: tp } = (await oada.get({ + path: `/${tpLink._id}`, + })) as unknown as { data: { _id: string } }; + shares.push({ tp, doc: document, dockey }); } + + break; } - await recursiveSignLinks(job.result); + default: { + throw new Error( + `Unknown document type (${doctype}) when attempting to do lookups`, + ); + } + } + } - // ------------- 3: put audits/cois/etc. to proper home - const versionedResult = recursiveMakeAllLinksVersioned( - job.result, - ) as Record>; - trace(versionedResult, 'all versioned links to bookmarks'); + void log.info( + 'sharing', + `Posting ${shares.length} shares jobs for doctype ${doctype} resulting from this transcription`, + ); + for await (const { dockey, doc, tp } of shares) { + const tpKey = tp._id.replace(/^resources\//, ''); + const { data: user } = await oada.get({ path: `/${tp._id}/user` }); + if (user instanceof Uint8Array) { + throw new TypeError('user was not JSON'); + } - // ------------- 4: cross link vdoc for pdf <-> audits,cois,letters,etc. - void log.info( - 'link-refs-pdf', - 'helper: linking result _refs under /_meta/vdoc', + // HACK FOR DEMO UNTIL WE GET MASKING SETTINGS: + let mask: + | { keys_to_mask: string[]; generate_pdf: boolean } + | boolean = false; + if (tpKey.includes('REDDYRAW')) { + info('COPY WILL MASK LOCATIONS FOR REDDYRAW'); + trace( + 'pdf is only generated for fsqa-audits or cois, doctype is %s', + doctype, ); + mask = { + keys_to_mask: ['location'], + generate_pdf: doctype === 'fsqa-audits' || doctype === 'cois', + }; + } - const vdoc = recursiveReplaceLinksWithReferences(job.result); - info( - "Linking _ref's into pdf/_meta, job.result before: %O, after: %O", - job.result, - vdoc, - ); - await oada.put({ - path: `/${pdfID}/_meta`, - data: { - // Fsqa-audits { ...links... }, or fsqa-certificates { ...links... }, etc. - vdoc: vdoc as Json, + // END HACK FOR DEMO + const { + headers: { 'content-location': location }, + } = await oada.post({ + path: `/resources`, + contentType: + tree.bookmarks?.services?.['*']?.jobs?.pending?.['*']?._type, + data: { + type: 'share-user-link', + service: 'trellis-shares', + config: { + src: `/${doc._id}`, + copy: { + // If "copy" key is not present it will link to original rather than copy + // copy full original as-is (rather than some subset of keys/paths). + // Note that this will screw up the signature if set to anything other than true. Also, ignored if mask is truthy since original must exist unmasked. + original: true, + meta: { vdoc: true }, // Copy only the vdoc path from _meta for the copy + mask, + }, + doctype, // Fsqa-audits, cois, fsqa-certificates, letters-of-guarantee + dest: `/bookmarks/trellisfw/${doctype}/${dockey}`, // This doubles-up bookmarks, but I think it's the most understandable to look at + user: user as unknown as any, // { id, bookmarks } + chroot: `/bookmarks/trellisfw/trading-partners/${tpKey}/user/bookmarks`, + tree: treeForDocumentType(doctype), }, - }); + }, + }); + const resourceID = location?.replace(/^\//, ''); + const reskey = resourceID?.replace(/^resources\//, ''); + trace('Shares job posted as resId = %s', resourceID); + const { + headers: { 'content-location': jobpath }, + } = await oada.put({ + path: `/bookmarks/services/trellis-shares/jobs`, + data: { [reskey!]: { _id: resourceID, _rev: 0 } }, + tree, + }); + const jobkey = jobpath?.replace(/^\/resources\/[^/]+\//, ''); + trace('Posted jobkey %s for shares', jobkey); + } + } - // HARDCODED UNTIL AINZ IS UPDATED: - // ------------- 6: lookup shares, post job to shares service - // Only share for smithfield - // why 'trading-partner' here with result - const result = job['trading-partner'] ? {} : job.result ?? {}; - - for await (const doctype of Object.keys(result)) { - const shares: Shares = []; - // Results are lists of links? - const doclist = (job.result?.[doctype] ?? {}) as Record< - string, - { _id: string } - >; - for await (const [dockey, document] of Object.entries(doclist)) { - trace( - 'Fetching lookups for doctype = %s, doc = %O, getting /%s/_meta/lookups', - doctype, - document, - document._id, - ); - const { data: lookups } = (await oada.get({ - path: `/${document._id}/_meta/lookups`, - })) as unknown as { - data: Record>; - }; - trace(lookups, 'lookups'); - let facilityID; - switch (doctype) { - case 'fsqa-audits': { - facilityID = lookups['fsqa-audit']!.organization!._ref; - await pushSharesForFacility({ - facilityid: facilityID, - doc: document, - dockey, - shares, - }); - break; - } - - case 'fsqa-certificates': { - facilityID = lookups['fsqa-certificate']!.organization!._ref; - await pushSharesForFacility({ - facilityid: facilityID, - doc: document, - dockey, - shares, - }); - break; - } - - case 'cois': { - const { data: holder } = (await oada.get({ - path: `/${lookups.coi!.holder!._ref}`, - })) as unknown as { - data: { 'trading-partners': Record }; - }; - trace({ holder }, 'Retrieved holder'); - for await (const tpLink of Object.values( - holder['trading-partners'], - )) { - const { data: tp } = await oada.get({ - path: `/${tpLink._id}`, - }); - if (!has(tp, '_id')) { - throw new Error( - `Expected _id on trading partner ${tpLink._id}`, - ); - } - - shares.push({ - tp: { _id: `${tp._id}` }, - doc: document, - dockey, - }); - } - - break; - } - - case 'letters-of-guarantee': { - const { data: buyer } = (await oada.get({ - path: `/${lookups['letter-of-guarantee']!.buyer!._ref}`, - })) as unknown as { - data: { 'trading-partners': Record }; - }; - trace('Retrieved buyer %O', buyer); - for await (const tpLink of Object.values( - buyer['trading-partners'], - )) { - const { data: tp } = (await oada.get({ - path: `/${tpLink._id}`, - })) as unknown as { data: { _id: string } }; - shares.push({ tp, doc: document, dockey }); - } - - break; - } - - default: { - throw new Error( - `Unknown document type (${doctype}) when attempting to do lookups`, - ); - } - } - } + void log.info('done', 'Completed all helper tasks'); - void log.info( - 'sharing', - `Posting ${shares.length} shares jobs for doctype ${doctype} resulting from this transcription`, - ); - for await (const { dockey, doc, tp } of shares) { - const tpKey = tp._id.replace(/^resources\//, ''); - const { data: user } = await oada.get({ path: `/${tp._id}/user` }); - if (user instanceof Uint8Array) { - throw new TypeError('user was not JSON'); - } + jobs.dec(); + return job.result as Json; +} - // HACK FOR DEMO UNTIL WE GET MASKING SETTINGS: - let mask: - | { keys_to_mask: string[]; generate_pdf: boolean } - | boolean = false; - if (tpKey.includes('REDDYRAW')) { - info('COPY WILL MASK LOCATIONS FOR REDDYRAW'); - trace( - 'pdf is only generated for fsqa-audits or cois, doctype is %s', - doctype, - ); - mask = { - keys_to_mask: ['location'], - generate_pdf: doctype === 'fsqa-audits' || doctype === 'cois', - }; - } +async function handleTargetTimeout({ + jobId, + oada, +} : { + jobId: string, + oada: OADAClient, +}) { + const { data } = await oada.get({path: `/${jobId}`}) as unknown as { data: Job }; + if (!data?.status) { + // Handle timeout here + await oada.post({ + path: `/${jobId}/updates`, + data: { + status: 'error', + information: 'TimeoutError', + }, + }); + // You can also throw an error or perform any other necessary actions + } +} - // END HACK FOR DEMO - const { - headers: { 'content-location': location }, - } = await oada.post({ - path: `/resources`, - contentType: - tree.bookmarks?.services?.['*']?.jobs?.pending?.['*']?._type, - data: { - type: 'share-user-link', - service: 'trellis-shares', - config: { - src: `/${doc._id}`, - copy: { - // If "copy" key is not present it will link to original rather than copy - // copy full original as-is (rather than some subset of keys/paths). - // Note that this will screw up the signature if set to anything other than true. Also, ignored if mask is truthy since original must exist unmasked. - original: true, - meta: { vdoc: true }, // Copy only the vdoc path from _meta for the copy - mask, - }, - doctype, // Fsqa-audits, cois, fsqa-certificates, letters-of-guarantee - dest: `/bookmarks/trellisfw/${doctype}/${dockey}`, // This doubles-up bookmarks, but I think it's the most understandable to look at - user: user as unknown as any, // { id, bookmarks } - chroot: `/bookmarks/trellisfw/trading-partners/${tpKey}/user/bookmarks`, - tree: treeForDocumentType(doctype), - }, - }, - }); - const resourceID = location?.replace(/^\//, ''); - const reskey = resourceID?.replace(/^resources\//, ''); - trace('Shares job posted as resId = %s', resourceID); - const { - headers: { 'content-location': jobpath }, - } = await oada.put({ - path: `/bookmarks/services/trellis-shares/jobs`, - data: { [reskey!]: { _id: resourceID, _rev: 0 } }, - tree, - }); - const jobkey = jobpath?.replace(/^\/resources\/[^/]+\//, ''); - trace('Posted jobkey %s for shares', jobkey); - } - } +async function jobChange({ + jobId, + log, + oada, + c, + unwatch, +} : { + jobId: string, + log: Logger, + oada: OADAClient, + c: Omit, + unwatch: () => Promise +}) { + trace('#jobChange: received change, c = %O ', c); + if (c.path !== '') { + return; // Child + } - void log.info('done', 'Completed all helper tasks'); - resolve(job.result as Json); - }; + if (c.type !== 'merge') { + return; // Delete + } - const jobChange = async (c: Omit) => { - try { - trace('#jobChange: received change, c = %O ', c); - // Look through all the changes, only do things if it sends a "success" or "error" update status - if (c.path !== '') { - return; // Child - } + const { updates } = (c.body ?? {}) as { + updates?: Record; + }; + if (!updates) { + return; // Not an update from target + } - if (c.type !== 'merge') { - return; // Delete - } + trace('#jobChange: it is a change we want (has an update)'); + for await (const v of Object.values(updates)) { + // We have one change that has time as a stringified unix timestamp. + // I think target posts it: "information": "Job result loaded to resource: '281aZusXonG7b7ZYY5w8TCtedcZ'", + // "time": "1650386464" + // It throws a horrible deprecation warning, so address it here. + if (Number.parseInt(v.time, 10)) { + v.time = moment(Number.parseInt(v.time, 10) * 1000).toISOString(); + } - const { updates } = (c.body ?? {}) as { - updates?: Record; - }; - if (!updates) { - return; // Not an update from target - } + const t = clone(v.time); + v.time = moment(v.time).toISOString(); + if (v.time === null) { + // @ts-expect-error --- ? + v.time = moment(t, 'X'); + } - trace('#jobChange: it is a change we want (has an update)'); - for await (const [k, v] of Object.entries(updates)) { - // We have one change that has time as a stringified unix timestamp. - // I think target posts it: "information": "Job result loaded to resource: '281aZusXonG7b7ZYY5w8TCtedcZ'", - // "time": "1650386464" - // It throws a horrible deprecation warning, so address it here. - if (Number.parseInt(v.time, 10)) { - v.time = moment(Number.parseInt(v.time, 10) * 1000).toISOString(); - } + // Fix for Target identifying loop + if (v.status === 'identifying') { + setTimeout(async () => { + await handleTargetTimeout({ jobId, oada }) + await unwatch(); + }, targetTimeout) + } - const t = clone(v.time); - v.time = moment(v.time).toISOString(); - if (v.time === null) { - // @ts-expect-error --- ? - v.time = moment(t, 'X'); - } - /* - If ( - v.information === - 'File is not a Textual PDF,requires OCR to be processed.' - ) { - await oada.post({ - path: `/${jobId}/updates`, - data: { - status: 'error', - information: v.information, - }, - }); - } - */ - - // Fix for Target identifying loop - if (v.status === 'identifying') { - const otherUpdates = await oada - .get({ - path: `/${jobId}/updates`, - }) - .then((r: any) => r.data as unknown as JsonObject); - - const others = Object.values(otherUpdates).filter( - (object: any) => - object.status && object.status === 'identifying', - ); - if (others.length > 5) { - info( - `Job ${jobId} stuck in 'identifying' loop for more than 5 minutes. Timing out.`, - ); - await oada.post({ - path: `/${jobId}/updates`, - data: { - status: 'error', - information: 'TimeoutError', - }, - }); - } else { - trace( - `Job ${jobId} update status 'identifying' happened less than 10 times.`, - ); - } - } + trace(v, '#jobChange: change update'); + if (v.status === 'success') { + trace( + '#jobChange: unwatching job and moving on with success tasks', + ); + await unwatch(); + return targetSuccess({ + jobId, + log, + oada, + }); + } - trace(v, '#jobChange: change update'); - if (v.status === 'success') { - trace( - '#jobChange: unwatching job and moving on with success tasks', - ); - await unwatch(); - // @ts-expect-error TODO: Why pass stuff to this function with no arguments? + if (v.status === 'error') { + error( + '#jobChange: unwatching job and moving on with error tasks', + ); + await unwatch(); + if (v.information) { + error('Target job [%s] failed: %O', jobId, v.information); + } - await targetSuccess({ update: v, key: k, change: c }); - jobs.dec(); - } + throw new Error( + `Target returned error: ${JSON.stringify(v, undefined, ' ')}`, + ); + } + } - if (v.status === 'error') { - error( - '#jobChange: unwatching job and moving on with error tasks', - ); - await unwatch(); - if (v.information) { - error('Target job [%s] failed: %O', jobId, v.information); - } + return; +} - throw new Error( - `Target returned error: ${JSON.stringify(v, undefined, ' ')}`, - ); - } - } - } catch (cError: unknown) { - reject(cError); - } - }; +async function handleJob({ + jobId, + log, + oada, +} : { + jobId: string, + log: Logger, + oada: OADAClient + // @ts-expect-error rewrite differently? +}) : Promise { + try { + // - once it sees "success" in the updates, + // it will post a job to trellis-signer and notify oada-jobs of success - const { changes } = await oada.watch({ - path: `${pending}/${jobKey}`, - type: 'single', - }); - const { data } = await oada.get({ - path: `${pending}/${jobKey}`, - }); - // Initially just the original job is the "body" for a synthetic change - const w = data as Change['body']; + const { changes } = await oada.watch({ + path: `/${jobId}`, + type: 'single', + }); - const unwatch = async () => { - await changes.return?.(); - // Await oada.unwatch(watchhandler); - }; + const unwatch = async () => { + await changes.return?.(); + // Await oada.unwatch(watchhandler); + }; - if (Buffer.isBuffer(w)) { - throw new TypeError('body is a buffer, cannot call jobChange'); + // Create a synthetic change for the current state of things when the job starts + await makeSyntheticChange({ + jobId, + log, + oada, + unwatch, + }); + + for await (const change of changes) { + const result = await jobChange({ jobId, log, oada, c: change, unwatch }); + if (result !== undefined) { + return result; } + } + } catch (cError: unknown) { + jobs.dec(); + errors.inc(); + throw cError; + } +} - await jobChange({ - path: '', - body: w, - type: 'merge', - }); +async function makeSyntheticChange({ + jobId, + log, + oada, + unwatch, +}: { + jobId: string, + log: Logger, + oada: OADAClient, + unwatch: () => Promise, +}): Promise{ + const { data } = await oada.get({ + path: `/${jobId}`, + }); + // Initially just the original job is the "body" for a synthetic change + const w = data as Change['body']; - for await (const change of changes) { - await jobChange(change); - } - } catch (cError: unknown) { - jobs.dec(); - errors.inc(); - reject(cError); - } // Have to actually reject the promise + if (Buffer.isBuffer(w)) { + throw new TypeError('body is a buffer, cannot call jobChange'); + } + + await jobChange({ + jobId, + log, + oada, + c:{ + path: '', + body: w, + type: 'merge', + }, + unwatch }); -}; +} async function pushSharesForFacility({ facilityid, @@ -989,15 +982,15 @@ export async function startJobCreator({ // Watch primary user's documents // eslint-disable-next-line no-new - const selfDocTypesWatch = new ListWatch({ + const selfDocumentsTypesWatch = new ListWatch({ path, conn: con, resume: false, onNewList: AssumeState.New, tree: documentTypeTree }); - selfDocTypesWatch.on(ChangeType.ItemAdded, documentTypeAdded()); - process.on('beforeExit', async () => selfDocTypesWatch.stop()); + selfDocumentsTypesWatch.on(ChangeType.ItemAdded, documentTypeAdded()); + process.on('beforeExit', async () => selfDocumentsTypesWatch.stop()); // eslint-disable-next-line no-inner-declarations async function cleanupBrokenLinks() { @@ -1008,11 +1001,11 @@ export async function startJobCreator({ }); const { data: jobs = {} } = (await con.get({ path: pending, - })) as unknown as { data: Jobs }; + })) as unknown as { data: Record }; let count = 0; await Promise.all( - Object.entries(jobs).map(async ([key, job]) => { + Object.entries(jobs).map(async ([key, job]) => { if (key.startsWith('_')) { return; } @@ -1085,10 +1078,14 @@ export async function startJobCreator({ //persistInterval: PERSIST_INTERVAL, }); docTypeWatch.on(ChangeType.ItemAdded, documentAdded(docType, masterid)); + /* + We cannot use this one unless we can identify our own target-helper-induced changes + and filter them out of being handled here. docTypeWatch.on( ChangeType.ItemChanged, documentAdded(docType, masterid) ); + */ process.on('beforeExit', async () => docTypeWatch.stop()); }; }