From 52801b881400d9a4b72a05e5f29eb6a92d01605c Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Tue, 21 Jan 2025 20:32:22 +0100 Subject: [PATCH] feat(sandbox): add support for getChildrenValues --- package.json | 2 +- src/classes/child-processor.ts | 58 ++++++++- src/classes/main-base.ts | 8 +- src/classes/sandbox.ts | 64 ++++++---- src/enums/child-command.ts | 1 + src/enums/parent-command.ts | 1 + src/interfaces/child-message.ts | 1 + src/interfaces/index.ts | 1 + src/interfaces/receiver.ts | 4 + .../fixture_processor_get_children_values.js | 10 ++ ...ure_processor_get_children_values_child.js | 9 ++ tests/test_sandboxed_process.ts | 113 ++++++++++++++++++ 12 files changed, 235 insertions(+), 37 deletions(-) create mode 100644 src/interfaces/receiver.ts create mode 100644 tests/fixtures/fixture_processor_get_children_values.js create mode 100644 tests/fixtures/fixture_processor_get_children_values_child.js diff --git a/package.json b/package.json index b0b42e9af9..09c3397880 100644 --- a/package.json +++ b/package.json @@ -46,7 +46,7 @@ "pretty:quick": "pretty-quick --ignore-path ./.eslintignore --staged", "semantic-release": "semantic-release", "semantic-release-prepare": "ts-node tools/semantic-release-prepare", - "test": "ts-mocha -p tsconfig-cjs.json --config ./.mocharc.js", + "test": "NODE_ENV=test ts-mocha -p tsconfig-cjs.json --config ./.mocharc.js", "test:watch": "ts-mocha -p tsconfig-cjs.json --paths 'tests/test_*.ts' -w --watch-extensions ts", "transform:commands": "node ./scripts/commandTransform.js ./rawScripts ./src/scripts", "tsc": "tsc", diff --git a/src/classes/child-processor.ts b/src/classes/child-processor.ts index 5ae835bc6d..ec608d2b71 100644 --- a/src/classes/child-processor.ts +++ b/src/classes/child-processor.ts @@ -1,5 +1,5 @@ import { ParentCommand } from '../enums'; -import { SandboxedJob } from '../interfaces'; +import { SandboxedJob, Receiver } from '../interfaces'; import { JobJsonSandbox } from '../types'; import { errorToJSON } from '../utils'; @@ -10,6 +10,8 @@ enum ChildStatus { Errored, } +const RESPONSE_TIMEOUT = process.env.NODE_ENV === 'test' ? 100 : 5_000; + /** * ChildProcessor * @@ -22,7 +24,10 @@ export class ChildProcessor { public processor: any; public currentJobPromise: Promise | undefined; - constructor(private send: (msg: any) => Promise) {} + constructor( + private send: (msg: any) => Promise, + private receiver: Receiver, + ) {} public async init(processorFile: string): Promise { let processor; @@ -120,7 +125,7 @@ export class ChildProcessor { opts: job.opts, returnValue: JSON.parse(job.returnvalue || '{}'), /* - * Emulate the real job `updateProgress` function, should works as `progress` function. + * Proxy `updateProgress` function, should works as `progress` function. */ async updateProgress(progress: number | object) { // Locally store reference to new progress value @@ -133,7 +138,7 @@ export class ChildProcessor { }); }, /* - * Emulate the real job `log` function. + * Proxy job `log` function. */ log: async (row: any) => { await send({ @@ -142,7 +147,7 @@ export class ChildProcessor { }); }, /* - * Emulate the real job `moveToDelayed` function. + * Proxy `moveToDelayed` function. */ moveToDelayed: async (timestamp: number, token?: string) => { await send({ @@ -151,7 +156,7 @@ export class ChildProcessor { }); }, /* - * Emulate the real job `updateData` function. + * Proxy `updateData` function. */ updateData: async (data: any) => { await send({ @@ -160,8 +165,49 @@ export class ChildProcessor { }); wrappedJob.data = data; }, + + /** + * Proxy `getChildrenValues` function. + */ + getChildrenValues: async () => { + const requestId = Math.random().toString(36).substring(2, 15); + await send({ + requestId, + cmd: ParentCommand.GetChildrenValues, + }); + + return waitResponse( + requestId, + this.receiver, + RESPONSE_TIMEOUT, + 'getChildrenValues', + ); + }, }; return wrappedJob; } } + +const waitResponse = async ( + requestId: string, + receiver: Receiver, + timeout: number, + cmd: string, +) => { + return new Promise((resolve, reject) => { + const listener = (msg: { requestId: string; value: any }) => { + if (msg.requestId === requestId) { + resolve(msg.value); + receiver.off('message', listener); + } + }; + receiver.on('message', listener); + + setTimeout(() => { + receiver.off('message', listener); + + reject(new Error(`TimeoutError: ${cmd} timed out in (${timeout}ms)`)); + }, timeout); + }); +}; diff --git a/src/classes/main-base.ts b/src/classes/main-base.ts index 28d1f6e551..46ed9e3760 100644 --- a/src/classes/main-base.ts +++ b/src/classes/main-base.ts @@ -5,12 +5,10 @@ import { ChildProcessor } from './child-processor'; import { ParentCommand, ChildCommand } from '../enums'; import { errorToJSON, toString } from '../utils'; +import { Receiver } from '../interfaces'; -export default ( - send: (msg: any) => Promise, - receiver: { on: (evt: 'message', cb: (msg: any) => void) => void }, -) => { - const childProcessor = new ChildProcessor(send); +export default (send: (msg: any) => Promise, receiver: Receiver) => { + const childProcessor = new ChildProcessor(send, receiver); receiver?.on('message', async msg => { try { diff --git a/src/classes/sandbox.ts b/src/classes/sandbox.ts index a6caad6d12..d53aa17954 100644 --- a/src/classes/sandbox.ts +++ b/src/classes/sandbox.ts @@ -28,32 +28,46 @@ const sandbox = ( child.on('exit', exitHandler); msgHandler = async (msg: ChildMessage) => { - switch (msg.cmd) { - case ParentCommand.Completed: - resolve(msg.value); - break; - case ParentCommand.Failed: - case ParentCommand.Error: { - const err = new Error(); - Object.assign(err, msg.value); - reject(err); - break; + try { + switch (msg.cmd) { + case ParentCommand.Completed: + resolve(msg.value); + break; + case ParentCommand.Failed: + case ParentCommand.Error: { + const err = new Error(); + Object.assign(err, msg.value); + reject(err); + break; + } + case ParentCommand.Progress: + await job.updateProgress(msg.value); + break; + case ParentCommand.Log: + await job.log(msg.value); + break; + case ParentCommand.MoveToDelayed: + await job.moveToDelayed( + msg.value?.timestamp, + msg.value?.token, + ); + break; + case ParentCommand.Update: + await job.updateData(msg.value); + break; + case ParentCommand.GetChildrenValues: + { + const value = await job.getChildrenValues(); + child.send({ + requestId: msg.requestId, + cmd: ChildCommand.GetChildrenValuesResponse, + value, + }); + } + break; } - case ParentCommand.Progress: - await job.updateProgress(msg.value); - break; - case ParentCommand.Log: - await job.log(msg.value); - break; - case ParentCommand.MoveToDelayed: - await job.moveToDelayed( - msg.value?.timestamp, - msg.value?.token, - ); - break; - case ParentCommand.Update: - await job.updateData(msg.value); - break; + } catch (err) { + reject(err); } }; diff --git a/src/enums/child-command.ts b/src/enums/child-command.ts index 1a721dae84..295f5b3673 100644 --- a/src/enums/child-command.ts +++ b/src/enums/child-command.ts @@ -2,4 +2,5 @@ export enum ChildCommand { Init, Start, Stop, + GetChildrenValuesResponse, } diff --git a/src/enums/parent-command.ts b/src/enums/parent-command.ts index d87a4e24f0..fe777665cb 100644 --- a/src/enums/parent-command.ts +++ b/src/enums/parent-command.ts @@ -8,4 +8,5 @@ export enum ParentCommand { MoveToDelayed, Progress, Update, + GetChildrenValues, } diff --git a/src/interfaces/child-message.ts b/src/interfaces/child-message.ts index 4a81bbe180..fbc2b25fa2 100644 --- a/src/interfaces/child-message.ts +++ b/src/interfaces/child-message.ts @@ -2,6 +2,7 @@ import { ParentCommand } from '../enums/parent-command'; export interface ChildMessage { cmd: ParentCommand; + requestId?: string; value?: any; err?: Record; } diff --git a/src/interfaces/index.ts b/src/interfaces/index.ts index 44a4945006..88c57740de 100644 --- a/src/interfaces/index.ts +++ b/src/interfaces/index.ts @@ -26,3 +26,4 @@ export * from './sandboxed-job'; export * from './sandboxed-options'; export * from './worker-options'; export * from './telemetry'; +export * from './receiver'; diff --git a/src/interfaces/receiver.ts b/src/interfaces/receiver.ts new file mode 100644 index 0000000000..24c6a48b6b --- /dev/null +++ b/src/interfaces/receiver.ts @@ -0,0 +1,4 @@ +export interface Receiver { + on: (evt: 'message', cb: (msg: any) => void) => void; + off: (evt: 'message', cb: (msg: any) => void) => void; +} diff --git a/tests/fixtures/fixture_processor_get_children_values.js b/tests/fixtures/fixture_processor_get_children_values.js new file mode 100644 index 0000000000..58e2741ff0 --- /dev/null +++ b/tests/fixtures/fixture_processor_get_children_values.js @@ -0,0 +1,10 @@ +/** + * A processor file to be used in tests. + * + */ +'use strict'; + +module.exports = async function (job) { + const values = await job.getChildrenValues(); + return values; +}; diff --git a/tests/fixtures/fixture_processor_get_children_values_child.js b/tests/fixtures/fixture_processor_get_children_values_child.js new file mode 100644 index 0000000000..38a644fced --- /dev/null +++ b/tests/fixtures/fixture_processor_get_children_values_child.js @@ -0,0 +1,9 @@ +/** + * A processor file to be used in tests. + * + */ +'use strict'; + +module.exports = function (job) { + return { childResult: 'bar' }; +}; diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index e10c67a06c..63c67891bd 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -768,6 +768,119 @@ function sandboxProcessTests( await worker.close(); }); + it('can get children values by calling getChildrenValues', async () => { + const childJobId = 'child-job-id'; + const childProcessFile = + __dirname + '/fixtures/fixture_processor_get_children_values_child.js'; + const parentProcessFile = + __dirname + '/fixtures/fixture_processor_get_children_values.js'; + const parentQueueName = `parent-queue-${v4()}`; + + const parentWorker = new Worker(parentQueueName, parentProcessFile, { + connection, + prefix, + drainDelay: 1, + useWorkerThreads, + }); + + const childWorker = new Worker(queueName, childProcessFile, { + connection, + prefix, + drainDelay: 1, + useWorkerThreads, + }); + + const parentCompleting = new Promise((resolve, reject) => { + parentWorker.on('completed', async (job: Job, value: any) => { + try { + expect(value).to.be.eql({ + [`bull:${queueName}:${childJobId}`]: { childResult: 'bar' }, + }); + await parentWorker.close(); + resolve(); + } catch (err) { + await parentWorker.close(); + reject(err); + } + }); + }); + + const flow = new FlowProducer({ connection, prefix }); + await flow.add({ + name: 'parent-job', + queueName: parentQueueName, + opts: { jobId: 'job-id' }, + children: [ + { name: 'child-job', queueName, opts: { jobId: childJobId } }, + ], + }); + + await parentCompleting; + await parentWorker.close(); + await childWorker.close(); + await flow.close(); + }); + + it('will fail job if calling getChildrenValues is too slow', async () => { + // Mockup Job.getChildrenValues to be slow + const getChildrenValues = Job.prototype.getChildrenValues; + Job.prototype.getChildrenValues = async function () { + await delay(50000); + return getChildrenValues.call(this); + }; + + const childJobId = 'child-job-id'; + const childProcessFile = + __dirname + '/fixtures/fixture_processor_get_children_values_child.js'; + const parentProcessFile = + __dirname + '/fixtures/fixture_processor_get_children_values.js'; + const parentQueueName = `parent-queue-${v4()}`; + + const parentWorker = new Worker(parentQueueName, parentProcessFile, { + connection, + prefix, + drainDelay: 1, + useWorkerThreads, + }); + + const childWorker = new Worker(queueName, childProcessFile, { + connection, + prefix, + drainDelay: 1, + useWorkerThreads, + }); + + const parentFailing = new Promise((resolve, reject) => { + parentWorker.on('failed', async (_, error: Error) => { + try { + expect(error.message).to.be.eql( + 'TimeoutError: getChildrenValues timed out in (100ms)', + ); + await parentWorker.close(); + resolve(); + } catch (err) { + await parentWorker.close(); + reject(err); + } + }); + }); + + const flow = new FlowProducer({ connection, prefix }); + await flow.add({ + name: 'parent-job', + queueName: parentQueueName, + opts: { jobId: 'job-id' }, + children: [ + { name: 'child-job', queueName, opts: { jobId: childJobId } }, + ], + }); + + await parentFailing; + await parentWorker.close(); + await childWorker.close(); + await flow.close(); + }); + it('should process and move to delayed', async () => { const processFile = __dirname + '/fixtures/fixture_processor_move_to_delayed.js';